Skip to content

Commit

Permalink
Merge branch 'main' into 362-research-current-architecture-of-plannin…
Browse files Browse the repository at this point in the history
…g-component
  • Loading branch information
johannaschwarz authored Nov 3, 2024
2 parents d9e1147 + cae64fd commit a4f9eac
Show file tree
Hide file tree
Showing 8 changed files with 645 additions and 105 deletions.
125 changes: 91 additions & 34 deletions code/planning/src/local_planner/motion_planning.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
#!/usr/bin/env python
# import tf.transformations
import ros_compatibility as roscomp
import rospy
import sys
import math
import os
import sys

import numpy as np
import ros_compatibility as roscomp
import rospy
from carla_msgs.msg import CarlaSpeedometer
from geometry_msgs.msg import Point, Pose, PoseStamped, Quaternion
from nav_msgs.msg import Path
from perception.msg import LaneChange, Waypoint
from ros_compatibility.node import CompatibleNode
from rospy import Publisher, Subscriber
from std_msgs.msg import String, Float32, Bool, Float32MultiArray, Int16
from nav_msgs.msg import Path
from geometry_msgs.msg import PoseStamped, Pose, Point, Quaternion
from carla_msgs.msg import CarlaSpeedometer
import numpy as np
from scipy.spatial.transform import Rotation
import math

from perception.msg import Waypoint, LaneChange

from utils import convert_to_ms, spawn_car, NUM_WAYPOINTS, TARGET_DISTANCE_TO_STOP
from std_msgs.msg import Bool, Float32, Float32MultiArray, Int16, String
from utils import NUM_WAYPOINTS, TARGET_DISTANCE_TO_STOP, convert_to_ms, spawn_car

sys.path.append(os.path.abspath(sys.path[0] + "/../../planning/src/behavior_agent"))
from behaviours import behavior_speed as bs # type: ignore # noqa: E402
Expand All @@ -30,23 +28,24 @@

class MotionPlanning(CompatibleNode):
"""
This node selects speeds according to the behavior in the Decision Tree
and the ACC.
Later this Node should compute a local Trajectory and forward
it to the Acting.
This node selects speeds based on the current behaviour and ACC to forward to the
acting components. It also handles the generation of trajectories for overtaking
maneuvers.
"""

def __init__(self):
super(MotionPlanning, self).__init__("MotionPlanning")
self.role_name = self.get_param("role_name", "hero")
self.control_loop_rate = self.get_param("control_loop_rate", 0.05)

# TODO: add type hints
self.target_speed = 0.0
self.__curr_behavior = None
self.__acc_speed = 0.0
self.__stopline = None # (Distance, isStopline)
self.__change_point = None # (Distance, isLaneChange, roadOption)
self.__collision_point = None
# TODO: clarify what the overtake_status values mean (by using an enum or ...)
self.__overtake_status = -1
self.published = False
self.current_pos = None
Expand Down Expand Up @@ -162,9 +161,11 @@ def __init__(self):
Float32, f"/paf/{self.role_name}/target_velocity", qos_profile=1
)

# TODO move up to subscribers
self.wp_subs = self.new_subscription(
Float32, f"/paf/{self.role_name}/current_wp", self.__set_wp, qos_profile=1
)

self.overtake_success_pub = self.new_publisher(
Float32, f"/paf/{self.role_name}/overtake_success", qos_profile=1
)
Expand Down Expand Up @@ -235,12 +236,31 @@ def change_trajectory(self, distance_obj):
pose_list = self.trajectory.poses

# Only use fallback
self.overtake_fallback(distance_obj, pose_list)
self.generate_overtake_trajectory(distance_obj, pose_list)
self.__overtake_status = 1
self.overtake_success_pub.publish(self.__overtake_status)
return

def overtake_fallback(self, distance, pose_list, unstuck=False):
def generate_overtake_trajectory(self, distance, pose_list, unstuck=False):
"""
Generates a trajectory for overtaking maneuvers.
This method creates a new trajectory for the vehicle to follow when an
overtaking maneuver is required. It adjusts the waypoints based on the current
waypoint and distance, and applies an offset to the waypoints to create a path
that avoids obstacles or gets the vehicle unstuck.
Args:
distance (int): The distance over which the overtaking maneuver should be
planned.
pose_list (list): A list of PoseStamped objects representing the current
planned path.
unstuck (bool, optional): A flag indicating whether the vehicle is stuck
and requires a larger offset to get unstuck. Defaults to False.
Returns:
None: The method updates the self.trajectory attribute with the new path.
"""
currentwp = self.current_wp
normal_x_offset = 2
unstuck_x_offset = 3 # could need adjustment with better steering
Expand Down Expand Up @@ -306,10 +326,25 @@ def __set_trajectory(self, data: Path):
"""
self.trajectory = data
self.loginfo("Trajectory received")

self.__corners = self.__calc_corner_points()

def __calc_corner_points(self):
def __calc_corner_points(self) -> list[list[np.ndarray]]:
"""
Calculate the corner points of the trajectory.
This method converts the poses in the trajectory to an array of coordinates,
calculates the angles between consecutive points, and identifies the points
where there is a significant change in the angle, indicating a corner or curve.
The corner points are then grouped by proximity and returned.
Returns:
list: A list of lists, where each sublist contains 2D points that form a
corner.
"""
coords = self.convert_pose_to_array(np.array(self.trajectory.poses))

# TODO: refactor by using numpy functions
x_values = np.array([point[0] for point in coords])
y_values = np.array([point[1] for point in coords])

Expand All @@ -320,13 +355,25 @@ def __calc_corner_points(self):
threshold = 1 # in degree
curve_change_indices = np.where(np.abs(np.diff(angles)) > threshold)[0]

sublist = self.create_sublists(curve_change_indices, proximity=5)
sublist = self.group_points_by_proximity(curve_change_indices, proximity=5)

coords_of_curve = [coords[i] for i in sublist]

return coords_of_curve

def create_sublists(self, points, proximity=5):
def group_points_by_proximity(self, points, proximity=5):
"""
Groups a list of points into sublists based on their proximity to each other.
Args:
points (list): A list of points to be grouped.
proximity (int, optional): The maximum distance between points in a sublist
Returns:
list: A list of sublists, where each sublist contains points that are within
the specified proximity of each other. Sublists with only one point are
filtered out.
"""
sublists = []
current_sublist = []

Expand All @@ -345,6 +392,7 @@ def create_sublists(self, points, proximity=5):
if current_sublist:
sublists.append(current_sublist)

# TODO: Check if it is intended to filter out sublists with only one point
filtered_list = [in_list for in_list in sublists if len(in_list) > 1]

return filtered_list
Expand All @@ -354,6 +402,7 @@ def get_cornering_speed(self):
pos = self.current_pos[:2]

def euclid_dist(vector1, vector2):
# TODO replace with numpy function
point1 = np.array(vector1)
point2 = np.array(vector2)

Expand All @@ -369,7 +418,7 @@ def map_corner(dist):
elif dist < 50:
return 7
else:
8
8 # TODO add return

distance_corner = 0
for i in range(len(corner) - 1):
Expand All @@ -395,14 +444,14 @@ def map_corner(dist):
return self.__get_speed_cruise()

@staticmethod
def convert_pose_to_array(poses: np.array):
"""convert pose array to numpy array
def convert_pose_to_array(poses: np.ndarray) -> np.ndarray:
"""Convert an array of PoseStamped objects to a numpy array of positions.
Args:
poses (np.array): pose array
poses (np.ndarray): Array of PoseStamped objects.
Returns:
np.array: numpy array
np.ndarray: Numpy array of shape (n, 2) containing the x and y positions.
"""
result_array = np.empty((len(poses), 2))
for pose in range(len(poses)):
Expand All @@ -424,6 +473,11 @@ def __check_emergency(self, data: Bool):
self.emergency_pub.publish(data)

def update_target_speed(self, acc_speed, behavior):
"""
Updates the target velocity based on the current behavior and ACC velocity and
overtake status and publishes it. The unit of the velocity is m/s.
"""

be_speed = self.get_speed_by_behavior(behavior)
if behavior == bs.parking.name or self.__overtake_status == 1:
self.target_speed = be_speed
Expand All @@ -439,6 +493,10 @@ def __set_acc_speed(self, data: Float32):
self.__acc_speed = data.data

def __set_curr_behavior(self, data: String):
"""
Sets the received current behavior of the vehicle.
If the behavior is an overtake behavior, a trajectory change is triggered.
"""
self.__curr_behavior = data.data
if data.data == bs.ot_enter_init.name:
if np.isinf(self.__collision_point):
Expand Down Expand Up @@ -480,6 +538,7 @@ def get_speed_by_behavior(self, behavior: str) -> float:
return speed

def __get_speed_unstuck(self, behavior: str) -> float:
# TODO check if this 'global' is necessary
global UNSTUCK_OVERTAKE_FLAG_CLEAR_DISTANCE
speed = 0.0
if behavior == bs.us_unstuck.name:
Expand Down Expand Up @@ -507,7 +566,9 @@ def __get_speed_unstuck(self, behavior: str) -> float:
# create overtake trajectory starting 6 meteres before
# the obstacle
# 6 worked well in tests, but can be adjusted
self.overtake_fallback(self.unstuck_distance, pose_list, unstuck=True)
self.generate_overtake_trajectory(
self.unstuck_distance, pose_list, unstuck=True
)
self.logfatal("Overtake Trajectory while unstuck!")
self.unstuck_overtake_flag = True
self.init_overtake_pos = self.current_pos[:2]
Expand Down Expand Up @@ -628,8 +689,8 @@ def __calc_virtual_overtake(self) -> float:

def run(self):
"""
Control loop
:return:
Control loop that updates the target speed and publishes the target trajectory
and speed over ROS topics.
"""

def loop(timer_event=None):
Expand All @@ -649,10 +710,6 @@ def loop(timer_event=None):


if __name__ == "__main__":
"""
main function starts the MotionPlanning node
:param args:
"""
roscomp.init("MotionPlanning")
try:
node = MotionPlanning()
Expand Down
Loading

0 comments on commit a4f9eac

Please sign in to comment.