Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

631 feature execution check function for planned local trajectory #654

Closed
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
48ae661
Merge current main and [Feature]: calculate positions of lane marking…
THDThieleT Jan 16, 2025
576000a
removed debugger from lane_position node
THDThieleT Jan 16, 2025
f0441ad
reset to old value
THDThieleT Jan 16, 2025
46ce25f
WIP adding function for getting lane on the left side of the car
THDThieleT Jan 16, 2025
a674d18
added functions for lanemarkings left and right, PRE shapely
THDThieleT Jan 17, 2025
18490b3
Merge main with shapely into this branch
EyMaxl Jan 20, 2025
c1424e4
added function head for local trajectory check
THDThieleT Jan 20, 2025
1b07c0d
first try for transforming path to shapely object
THDThieleT Jan 20, 2025
c84dfe3
node that provides test path for testing the map function
THDThieleT Jan 20, 2025
c8d2cb8
fixed import error from Path message type
THDThieleT Jan 22, 2025
9d463c5
added node vor publishing test trajectory
THDThieleT Jan 22, 2025
467fca5
current WIP
THDThieleT Jan 22, 2025
3e98b13
Merge remote-tracking branch 'origin/582-feature-calculate-positions-…
Jan 23, 2025
b32a303
check trajectory function succesfully checks if lane marking is cros…
Jan 23, 2025
6fbb236
temporary adjustments for visualizing new map features
Jan 23, 2025
3a0f48c
test path is now dynamic and swiches sides
Jan 23, 2025
118e856
set lidar Z distance back to -inf for lane detection to work better
Jan 23, 2025
a5d498f
fixed Syntax error
THDThieleT Jan 24, 2025
b603b1a
added collision check for local path and parameter tuning
THDThieleT Jan 24, 2025
f91c9ac
Merge remote-tracking branch 'origin/main' into 631-feature-execution…
THDThieleT Jan 24, 2025
bcdbbb4
fixed bug where buffer area increases instead of changes
THDThieleT Jan 24, 2025
8cb86c3
[WIP] needs to be removed for PR / reset
THDThieleT Jan 24, 2025
589ee7c
Merge remote-tracking branch 'origin/main' into 631-feature-execution…
THDThieleT Jan 26, 2025
4a63321
update function description
THDThieleT Jan 26, 2025
6db0192
reformatting for linter check
THDThieleT Jan 26, 2025
b5510aa
removed wrong inserted text after merge conflict
THDThieleT Jan 26, 2025
7312f34
try for fixing linter
THDThieleT Jan 26, 2025
2cf5796
removed failure from merge conflict
THDThieleT Jan 26, 2025
7b03578
fixed error after renaming variable
THDThieleT Jan 26, 2025
4182c57
added comments and fixed return type of get_lanemarking for linter
THDThieleT Jan 27, 2025
aecbab9
added map subscriber to test local path
Jan 29, 2025
2b028c9
added path test to launch file
Jan 29, 2025
4141310
always print check status
seitzseb Jan 29, 2025
2fa2121
Merge remote-tracking branch 'origin/main' into 631-feature-execution…
seitzseb Jan 29, 2025
9c6207c
reset debug option for map.py to False
THDThieleT Jan 29, 2025
ac8038b
removed path_test from launch file
THDThieleT Feb 11, 2025
ccac3ef
Merge remote-tracking branch 'origin/Main' into 631-feature-execution…
THDThieleT Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion code/mapping/ext_modules/.debug_enabled
Original file line number Diff line number Diff line change
@@ -1 +1 @@
False
True
119 changes: 119 additions & 0 deletions code/mapping/ext_modules/mapping_common/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import shapely
from shapely import STRtree
import shapely.plotting
import numpy as np
import numpy.typing as npt

Expand All @@ -15,6 +16,7 @@
from shapely.geometry import Polygon, LineString

from mapping import msg
from nav_msgs.msg import Path


@dataclass
Expand Down Expand Up @@ -176,6 +178,40 @@ def get_entities_with_coverage(self, polygon, entities: List[Entity], coverage):

return collision_entities

def entities_in_area(self, radius, only_in_front=True) -> List[Entity]:
"""Returns the entities without the hero car
in a given radius around the hero car.
Flag only_in_front decides if entities
behind the car, are returend too.
Only checks if the first entity is_hero

Returns:
List[Entity]: Entities in front and/or back of hero
without the hero car in given radius
"""
entities = self.entities_without_hero()
entities_in_area = []

if only_in_front:
for entry in entities:
distance = np.sqrt(
np.power(entry.transform.translation().x(), 2)
+ np.power(entry.transform.translation().y(), 2)
)
if entry.transform.translation().x() > 0 and distance < radius:
entities_in_area.append(entry)
return entities_in_area
else:
for entry in entities:
distance = np.sqrt(
np.power(entry.transform.translation().x(), 2)
+ np.power(entry.transform.translation().y(), 2)
)
if distance < radius:
entities_in_area.append(entry)

return entities_in_area
Comment on lines +167 to +199
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several remarks:

  • Should be moved into the MapTree class
  • This function seems to not take into account the shapes of the entities. It is also a duplicate of the same name function above. It should use the Circle polygon generated there and then filter these entities with MapTree.get_overlapping_entities()


def build_tree(
self,
f: Optional[FlagFilter] = None,
Expand Down Expand Up @@ -229,6 +265,89 @@ def to_ros_msg(self) -> msg.Map:
header = Header(stamp=self.timestamp)
return msg.Map(header=header, entities=entities)

def lane_marking_left(self) -> Optional[Entity]:
"""
Returns:
+ Optional[Entity]: The left lane marking entity or None if not found
"""
for entry in self.entities:
if entry.flags._is_lanemark is True and entry.position_index == 1:
return entry
return None

def lane_marking_right(self) -> Optional[Entity]:
"""
Returns:
+ Optional[Entity]: The right lane marking entity or None if not found
"""
for entry in self.entities:
if entry.flags._is_lanemark is True and entry.position_index == -1:
return entry
return None
Comment on lines +254 to +272
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combining those functions into one with a flag for left/right would be nicer


# Class-level constants
VEHICLE_WIDTH_BUFFER = 1.0 # meters
LANE_CHECK_BUFFER = 0.5 # meters
OBSTACLE_DETECTION_RADIUS = 10.0 # meters
Comment on lines +274 to +277
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be input parameters to the function


def check_trajectory(self, local_path: Path) -> int:
"""
Checks the calculated local path on multiple conditions

Args:
local_path (Path): The path to check

Returns:
int: Status code indicating the path's validity:
-1: local_path is None or empty
0: Path has no obstacles/infringements
1: Path planned through gap too small for car
2: Path cuts left lane marking
3: Path cuts right lane marking

Raises:
ValueError: If local_path.poses is empty
"""
if local_path.poses is None or not local_path.poses:
return -1

# transform local path to shapely object
trajectory_shapely_points = []
for pos in local_path.poses:
trajectory_shapely_points.append(
shapely.Point(pos.pose.position.x, pos.pose.position.y)
)
local_trajectory = shapely.LineString(trajectory_shapely_points)
# widen the Linestring to an area representing the cars width
local_trajectory_buffer = local_trajectory.buffer(self.VEHICLE_WIDTH_BUFFER, 3)

# Check path collision with other entities
obstacles = self.entities_in_area(self.OBSTACLE_DETECTION_RADIUS)
for entry in obstacles:
if not entry.flags._is_lanemark and shapely.intersection(
entry.to_shapely().poly, local_trajectory_buffer
):
return 1

# decrease buffer size for lane checking
local_trajectory_buffer = local_trajectory.buffer(self.LANE_CHECK_BUFFER, 3)

# Check left lane
left_lane = self.lane_marking_left()
if left_lane is not None and shapely.intersection(
left_lane.to_shapely().poly, local_trajectory_buffer
):
return 2

# Check right lane
right_lane = self.lane_marking_right()
if right_lane is not None and shapely.intersection(
right_lane.to_shapely().poly, local_trajectory_buffer
):
return 3

return 0
Comment on lines +279 to +335
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

🛠️ Refactor suggestion

Verification confirms missing test coverage for critical trajectory validation.

The codebase lacks unit tests for the check_trajectory method, which is responsible for crucial safety checks including:

  • Path validation
  • Obstacle collision detection
  • Lane marking violation checks

The magic number '3' in buffer calls is also confirmed to be hardcoded without explanation.

🔗 Analysis chain

Improve robustness and maintainability of trajectory checking.

The method has several areas for improvement:

  1. Extract magic number '3' in buffer calls into a constant
  2. Simplify nested conditions
  3. Add comprehensive unit tests
+    # Additional class constant
+    BUFFER_QUADRANT_SEGMENTS = 3  # Number of segments per quadrant for buffer operations
+
     def check_trajectory(self, local_path: Path) -> int:
         # ... existing docstring ...
         local_trajectory = shapely.LineString(trajectory_shapely_points)
-        local_trajectory_buffer = local_trajectory.buffer(self.VEHICLE_WIDTH_BUFFER, 3)
+        local_trajectory_buffer = local_trajectory.buffer(
+            self.VEHICLE_WIDTH_BUFFER, 
+            self.BUFFER_QUADRANT_SEGMENTS
+        )
         
         # ... rest of the method ...
-        local_trajectory_buffer = local_trajectory.buffer(self.LANE_CHECK_BUFFER, 3)
+        local_trajectory_buffer = local_trajectory.buffer(
+            self.LANE_CHECK_BUFFER, 
+            self.BUFFER_QUADRANT_SEGMENTS
+        )

Let me verify the test coverage:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing test cases
echo "=== Searching for trajectory tests ==="
rg "def test.*trajectory" --type py

# Check content of test files
echo -e "\n=== Checking test files ==="
fd "test_.*\.py$" -x cat {}

Length of output: 18773



@dataclass(init=False)
class MapTree:
Expand Down
2 changes: 2 additions & 0 deletions code/mapping/src/mapping_data_integration.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file seem unnecessary. (lidar_data is already set to None as part of the class definition)

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, name, **kwargs):
callback=self.lanemarkings_callback,
qos_profile=1,
)
self.lidar_data = None
self.new_subscription(
topic=self.get_param("~hero_speed_topic", "/carla/hero/Speed"),
msg_type=CarlaSpeedometer,
Expand Down Expand Up @@ -336,6 +337,7 @@ def publish_new_map(self, timer_event=None):
stamp = rospy.get_rostime()
map = Map(timestamp=stamp, entities=entities)
msg = map.to_ros_msg()

self.map_publisher.publish(msg)


Expand Down
62 changes: 62 additions & 0 deletions code/mapping/src/path_test.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not a "main" node of the car, I would recommend putting this in a test subfolder inside the src directory

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env python
import ros_compatibility as roscomp
import rospy
from geometry_msgs.msg import PoseStamped
from ros_compatibility.node import CompatibleNode
from nav_msgs.msg import Path
import random


class TestPath(CompatibleNode):

def __init__(self):
"""
This node handles the translation from the static main frame to the
moving hero frame. The hero frame always moves and rotates as the
ego vehicle does. The hero frame is used by sensors like the lidar.
Rviz also uses the hero frame. The main frame is used for planning.
"""
super(TestPath, self).__init__("TestPath")
self.loginfo("TestPath node started")
self.local_trajectory = Path()

self.publisher = self.new_publisher(
msg_type=Path,
topic="test_trajectory",
qos_profile=1,
)

self.rate = 0.5
self.new_timer(0.5, self.generate_trajectory)

def generate_trajectory(self, timer_event=None) -> Path:
path_msg = Path()
path_msg.header = rospy.Header()
path_msg.header.stamp = rospy.Time.now()
path_msg.header.frame_id = "hero"
path_msg.poses = []
curve = 0
factor = 1
if random.uniform(-1, 1) < 0:
factor = -1

for i in range(0, 10):
pos = PoseStamped()
pos.header.stamp = rospy.Time.now()
pos.header.frame_id = "hero"
curve = curve + 0.01 * i * random.uniform(0, 1)
pos.pose.position.x = i
pos.pose.position.y = curve * factor
pos.pose.position.z = 0
pos.pose.orientation.w = 1
path_msg.poses.append(pos)

self.local_trajectory = path_msg
self.publisher.publish(path_msg)


if __name__ == "__main__":
name = "path_test_node"
roscomp.init(name)
node = TestPath()
node.spin()
Loading