-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
631 feature execution check function for planned local trajectory #654
Changes from 30 commits
48ae661
576000a
f0441ad
46ce25f
a674d18
18490b3
c1424e4
1b07c0d
c84dfe3
c8d2cb8
9d463c5
467fca5
3e98b13
b32a303
6fbb236
3a0f48c
118e856
a5d498f
b603b1a
f91c9ac
bcdbbb4
8cb86c3
589ee7c
4a63321
6db0192
b5510aa
7312f34
2cf5796
7b03578
4182c57
aecbab9
2b028c9
4141310
2fa2121
9c6207c
ac8038b
ccac3ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
False | ||
True |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
|
||
import shapely | ||
from shapely import STRtree | ||
import shapely.plotting | ||
import numpy as np | ||
import numpy.typing as npt | ||
|
||
|
@@ -15,6 +16,7 @@ | |
from shapely.geometry import Polygon, LineString | ||
|
||
from mapping import msg | ||
from nav_msgs.msg import Path | ||
|
||
|
||
@dataclass | ||
|
@@ -176,6 +178,40 @@ def get_entities_with_coverage(self, polygon, entities: List[Entity], coverage): | |
|
||
return collision_entities | ||
|
||
def entities_in_area(self, radius, only_in_front=True) -> List[Entity]: | ||
"""Returns the entities without the hero car | ||
in a given radius around the hero car. | ||
Flag only_in_front decides if entities | ||
behind the car, are returend too. | ||
Only checks if the first entity is_hero | ||
|
||
Returns: | ||
List[Entity]: Entities in front and/or back of hero | ||
without the hero car in given radius | ||
""" | ||
entities = self.entities_without_hero() | ||
entities_in_area = [] | ||
|
||
if only_in_front: | ||
for entry in entities: | ||
distance = np.sqrt( | ||
np.power(entry.transform.translation().x(), 2) | ||
+ np.power(entry.transform.translation().y(), 2) | ||
) | ||
if entry.transform.translation().x() > 0 and distance < radius: | ||
entities_in_area.append(entry) | ||
return entities_in_area | ||
else: | ||
for entry in entities: | ||
distance = np.sqrt( | ||
np.power(entry.transform.translation().x(), 2) | ||
+ np.power(entry.transform.translation().y(), 2) | ||
) | ||
if distance < radius: | ||
entities_in_area.append(entry) | ||
|
||
return entities_in_area | ||
|
||
def build_tree( | ||
self, | ||
f: Optional[FlagFilter] = None, | ||
|
@@ -229,6 +265,89 @@ def to_ros_msg(self) -> msg.Map: | |
header = Header(stamp=self.timestamp) | ||
return msg.Map(header=header, entities=entities) | ||
|
||
def lane_marking_left(self) -> Optional[Entity]: | ||
""" | ||
Returns: | ||
+ Optional[Entity]: The left lane marking entity or None if not found | ||
""" | ||
for entry in self.entities: | ||
if entry.flags._is_lanemark is True and entry.position_index == 1: | ||
return entry | ||
return None | ||
|
||
def lane_marking_right(self) -> Optional[Entity]: | ||
""" | ||
Returns: | ||
+ Optional[Entity]: The right lane marking entity or None if not found | ||
""" | ||
for entry in self.entities: | ||
if entry.flags._is_lanemark is True and entry.position_index == -1: | ||
return entry | ||
return None | ||
Comment on lines
+254
to
+272
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Combining those functions into one with a flag for left/right would be nicer |
||
|
||
# Class-level constants | ||
VEHICLE_WIDTH_BUFFER = 1.0 # meters | ||
LANE_CHECK_BUFFER = 0.5 # meters | ||
OBSTACLE_DETECTION_RADIUS = 10.0 # meters | ||
Comment on lines
+274
to
+277
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should be input parameters to the function |
||
|
||
def check_trajectory(self, local_path: Path) -> int: | ||
""" | ||
Checks the calculated local path on multiple conditions | ||
|
||
Args: | ||
local_path (Path): The path to check | ||
|
||
Returns: | ||
int: Status code indicating the path's validity: | ||
-1: local_path is None or empty | ||
0: Path has no obstacles/infringements | ||
1: Path planned through gap too small for car | ||
2: Path cuts left lane marking | ||
3: Path cuts right lane marking | ||
|
||
Raises: | ||
ValueError: If local_path.poses is empty | ||
""" | ||
if local_path.poses is None or not local_path.poses: | ||
return -1 | ||
|
||
# transform local path to shapely object | ||
trajectory_shapely_points = [] | ||
for pos in local_path.poses: | ||
trajectory_shapely_points.append( | ||
shapely.Point(pos.pose.position.x, pos.pose.position.y) | ||
) | ||
local_trajectory = shapely.LineString(trajectory_shapely_points) | ||
# widen the Linestring to an area representing the cars width | ||
local_trajectory_buffer = local_trajectory.buffer(self.VEHICLE_WIDTH_BUFFER, 3) | ||
|
||
# Check path collision with other entities | ||
obstacles = self.entities_in_area(self.OBSTACLE_DETECTION_RADIUS) | ||
for entry in obstacles: | ||
if not entry.flags._is_lanemark and shapely.intersection( | ||
entry.to_shapely().poly, local_trajectory_buffer | ||
): | ||
return 1 | ||
|
||
# decrease buffer size for lane checking | ||
local_trajectory_buffer = local_trajectory.buffer(self.LANE_CHECK_BUFFER, 3) | ||
|
||
# Check left lane | ||
left_lane = self.lane_marking_left() | ||
if left_lane is not None and shapely.intersection( | ||
left_lane.to_shapely().poly, local_trajectory_buffer | ||
): | ||
return 2 | ||
|
||
# Check right lane | ||
right_lane = self.lane_marking_right() | ||
if right_lane is not None and shapely.intersection( | ||
right_lane.to_shapely().poly, local_trajectory_buffer | ||
): | ||
return 3 | ||
|
||
return 0 | ||
Comment on lines
+279
to
+335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification 🛠️ Refactor suggestion Verification confirms missing test coverage for critical trajectory validation. The codebase lacks unit tests for the
The magic number '3' in buffer calls is also confirmed to be hardcoded without explanation. 🔗 Analysis chainImprove robustness and maintainability of trajectory checking. The method has several areas for improvement:
+ # Additional class constant
+ BUFFER_QUADRANT_SEGMENTS = 3 # Number of segments per quadrant for buffer operations
+
def check_trajectory(self, local_path: Path) -> int:
# ... existing docstring ...
local_trajectory = shapely.LineString(trajectory_shapely_points)
- local_trajectory_buffer = local_trajectory.buffer(self.VEHICLE_WIDTH_BUFFER, 3)
+ local_trajectory_buffer = local_trajectory.buffer(
+ self.VEHICLE_WIDTH_BUFFER,
+ self.BUFFER_QUADRANT_SEGMENTS
+ )
# ... rest of the method ...
- local_trajectory_buffer = local_trajectory.buffer(self.LANE_CHECK_BUFFER, 3)
+ local_trajectory_buffer = local_trajectory.buffer(
+ self.LANE_CHECK_BUFFER,
+ self.BUFFER_QUADRANT_SEGMENTS
+ ) Let me verify the test coverage: 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for existing test cases
echo "=== Searching for trajectory tests ==="
rg "def test.*trajectory" --type py
# Check content of test files
echo -e "\n=== Checking test files ==="
fd "test_.*\.py$" -x cat {}
Length of output: 18773 |
||
|
||
|
||
@dataclass(init=False) | ||
class MapTree: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes in this file seem unnecessary. (lidar_data is already set to None as part of the class definition) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is not a "main" node of the car, I would recommend putting this in a test subfolder inside the src directory |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
#!/usr/bin/env python | ||
import ros_compatibility as roscomp | ||
import rospy | ||
from geometry_msgs.msg import PoseStamped | ||
from ros_compatibility.node import CompatibleNode | ||
from nav_msgs.msg import Path | ||
import random | ||
|
||
|
||
class TestPath(CompatibleNode): | ||
|
||
def __init__(self): | ||
""" | ||
This node handles the translation from the static main frame to the | ||
moving hero frame. The hero frame always moves and rotates as the | ||
ego vehicle does. The hero frame is used by sensors like the lidar. | ||
Rviz also uses the hero frame. The main frame is used for planning. | ||
""" | ||
super(TestPath, self).__init__("TestPath") | ||
self.loginfo("TestPath node started") | ||
self.local_trajectory = Path() | ||
|
||
self.publisher = self.new_publisher( | ||
msg_type=Path, | ||
topic="test_trajectory", | ||
qos_profile=1, | ||
) | ||
|
||
self.rate = 0.5 | ||
self.new_timer(0.5, self.generate_trajectory) | ||
|
||
def generate_trajectory(self, timer_event=None) -> Path: | ||
path_msg = Path() | ||
path_msg.header = rospy.Header() | ||
path_msg.header.stamp = rospy.Time.now() | ||
path_msg.header.frame_id = "hero" | ||
path_msg.poses = [] | ||
curve = 0 | ||
factor = 1 | ||
if random.uniform(-1, 1) < 0: | ||
factor = -1 | ||
|
||
for i in range(0, 10): | ||
pos = PoseStamped() | ||
pos.header.stamp = rospy.Time.now() | ||
pos.header.frame_id = "hero" | ||
curve = curve + 0.01 * i * random.uniform(0, 1) | ||
pos.pose.position.x = i | ||
pos.pose.position.y = curve * factor | ||
pos.pose.position.z = 0 | ||
pos.pose.orientation.w = 1 | ||
path_msg.poses.append(pos) | ||
|
||
self.local_trajectory = path_msg | ||
self.publisher.publish(path_msg) | ||
|
||
|
||
if __name__ == "__main__": | ||
name = "path_test_node" | ||
roscomp.init(name) | ||
node = TestPath() | ||
node.spin() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several remarks: