Skip to content

Commit

Permalink
feat: Add Node for Motion Planning
Browse files Browse the repository at this point in the history
  • Loading branch information
JuliusMiller committed Dec 14, 2023
1 parent 60979a5 commit a3a3815
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 3 deletions.
6 changes: 3 additions & 3 deletions code/planning/global_planner/launch/global_planner.launch
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<launch>
<!--
<node pkg="planning" type="dev_global_route.py" name="DevGlobalRoute" output="screen">

<!-- <node pkg="planning" type="dev_global_route.py" name="DevGlobalRoute" output="screen">
<param name="from_txt" value="True" />
<param name="sampling_resolution" value="75.0" />
<param name="routes" value="/opt/leaderboard/data/routes_devtest.xml" />
<param name="global_route_txt" value="/code/planning/global_planner/src/global_route.txt" />
<param name="role_name" value="hero" />
</node>-->
</node> -->
<node pkg="planning" type="global_planner.py" name="PrePlanner" output="screen">
<param name="role_name" value="hero" />
<param name="control_loop_rate" value="1" />
Expand Down
5 changes: 5 additions & 0 deletions code/planning/local_planner/launch/local_planner.launch
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@
<param name="role_name" value="hero" />
<param name="control_loop_rate" value="1" />
</node>

<node pkg="local_planner" type="motion_planning.py" name="MotionPlanning" output="screen">
<param name="role_name" value="hero" />
<param name="control_loop_rate" value="1" />
</node>
</launch>
209 changes: 209 additions & 0 deletions code/planning/local_planner/src/motion_planning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
#!/usr/bin/env python
# import rospy
# import tf.transformations
import ros_compatibility as roscomp
from ros_compatibility.node import CompatibleNode
from rospy import Publisher, Subscriber
from std_msgs.msg import String, Float32
import numpy as np

# from behavior_agent.msg import BehaviorSpeed
from perception.msg import Waypoint, LaneChange
import behavior_speed as bs

# from geometry_msgs.msg import PoseStamped, Pose, Point, Quaternion
# from carla_msgs.msg import CarlaRoute # , CarlaWorldInfo
# from nav_msgs.msg import Path
# from std_msgs.msg import String
# from std_msgs.msg import Float32MultiArray


def convert_to_ms(speed):
return speed / 3.6


class MotionPlanning(CompatibleNode):
"""
This node selects speeds according to the behavior in the Decision Tree
and the ACC.
Later this Node should compute a local Trajectory and forward
it to the Acting.
"""

def __init__(self):
super(MotionPlanning, self).__init__('MotionPlanning')
self.role_name = self.get_param("role_name", "hero")
self.control_loop_rate = self.get_param("control_loop_rate", 0.5)
self.logdebug("MotionPlanning started")

self.target_speed = 0.0
self.__curr_behavior = None
self.__acc_speed = 0.0
self.__stopline = None # (Distance, isStopline)
self.__change_point = None # (Distance, isLaneChange, roadOption)

# Subscriber
self.curr_behavior_sub: Subscriber = self.new_subscription(
String,
f"/paf/{self.role_name}/curr_behavior",
self.__set_curr_behavior,
qos_profile=1)

self.acc_sub: Subscriber = self.new_subscription(
Float32,
f"/paf/{self.role_name}/acc_velocity",
self.__set_acc_speed,
qos_profile=1)

self.stopline_sub: Subscriber = self.new_subscription(
Waypoint,
f"/paf/{self.role_name}/waypoint_distance",
self.__set_stopline,
qos_profile=1)

self.change_point_sub: Subscriber = self.new_subscription(
LaneChange,
f"/paf/{self.role_name}/lane_change_distance",
self.__set_change_point,
qos_profile=1)

# Publisher
self.velocity_pub: Publisher = self.new_publisher(
Float32,
f"/paf/{self.role_name}/target_velocity",
qos_profile=1)

def update_target_speed(self):
be_speed = self.get_speed_by_behavior(self.__curr_behavior)

self.target_speed = min(be_speed, self.__acc_speed)
self.velocity_pub.publish(self.target_speed)
self.logerr(self.target_speed)

def __set_acc_speed(self, data: Float32):
self.__acc_speed = data.data

def __set_curr_behavior(self, data: String):
self.__curr_behavior = data.data
# self.logerr(self.__curr_behavior)

def __set_stopline(self, data: Waypoint) -> float:
if data is not None:
self.__stopline = (data.distance, data.isStopLine)

def __set_change_point(self, data: LaneChange):
if data is not None:
self.__change_point = \
(data.distance, data.isLaneChange, data.roadOption)

def get_speed_by_behavior(self, behavior: str) -> float:
speed = 0.0
split = "_"
short_behavior = behavior.partition(split)[0]
if short_behavior == "int":
speed = self.__get_speed_intersection(behavior)
elif short_behavior == "lc":
speed = self.__get_speed_lanechange(behavior)
else:
speed = self.__get_speed_cruise()

return speed

def __get_speed_intersection(self, behavior: str) -> float:
speed = 0.0
if behavior == bs.int_app_init.name:
speed = bs.int_app_init.speed
elif behavior == bs.int_app_green.name:
speed = bs.int_app_green.speed
elif behavior == bs.int_app_no_sign.name:
speed = self.__calc_speed_to_stop_intersection()
elif behavior == bs.int_wait.name:
speed == bs.int_wait.speed
elif behavior == bs.int_enter_no_light:
speed = bs.int_enter_no_light.speed
elif behavior == bs.int_enter_empty_str.name:
speed = bs.int_enter_empty_str.speed
elif behavior == bs.int_enter_light.name:
speed == bs.int_enter_light.speed
elif behavior == bs.int_exit:
speed = bs.int_exit.speed

return speed

def __get_speed_lanechange(self, behavior: str) -> float:
speed = 0.0
if behavior == bs.lc_app_init.name:
speed = bs.lc_app_init.speed
elif behavior == bs.lc_app_blocked.name:
speed = bs.lc_app_blocked.speed # calc_speed_to_stop_lanechange()
elif behavior == bs.lc_enter_init.name:
speed = bs.lc_enter_init.speed
elif behavior == bs.lc_exit.name:
speed = bs.lc_exit.speed

return speed

def __get_speed_cruise(self) -> float:
return self.__acc_speed

def __calc_speed_to_stop_intersection(self) -> float:
target_distance = 3.0
virtual_stopline_distance = self.__calc_virtual_stopline()
# calculate speed needed for stopping
v_stop = max(convert_to_ms(10.),
convert_to_ms((virtual_stopline_distance / 30)
* 50))
if v_stop > convert_to_ms(50.0):
v_stop = convert_to_ms(50.0)
if virtual_stopline_distance < target_distance:
v_stop = 0.0

# TODO: Find out purpose
def __calc_speed_to_stop_lanechange(self) -> float:
if self.__change_point[0] != np.inf and self.__change_point[1]:
stopline = self.__change_point[0]
else:
return 100

v_stop = max(convert_to_ms(5.),
convert_to_ms((stopline / 30) ** 1.5
* 50))
if v_stop > convert_to_ms(50.0):
v_stop = convert_to_ms(30.0)
return v_stop

def __calc_virtual_stopline(self) -> float:
if self.__stopline[0] != np.inf and self.__stopline[1]:
return self.__stopline[0]
elif self.traffic_light_detected:
return self.traffic_light_distance
else:
return 0.0

def run(self):
"""
Control loop
:return:
"""

def loop(timer_event=None):
self.update_target_speed()

self.new_timer(self.control_loop_rate, loop)
self.spin()


if __name__ == "__main__":
"""
main function starts the MotionPlanning node
:param args:
"""
roscomp.init('MotionPlanning')

try:
node = MotionPlanning()
node.run()
except KeyboardInterrupt:
pass
finally:
roscomp.shutdown()

0 comments on commit a3a3815

Please sign in to comment.