Skip to content

Commit

Permalink
cleaned up code
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukasnol committed Dec 18, 2024
1 parent 910bb99 commit cd13e96
Showing 1 changed file with 3 additions and 178 deletions.
181 changes: 3 additions & 178 deletions code/perception/src/Lanedetection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,13 @@ def __init__(self, name, **kwargs):
self.bridge = CvBridge()
self.image_msg_header = Header()
self.image_msg_header.frame_id = "segmented_image_frame"

self.role_name = self.get_param("role_name", "hero")
# setup subscriptions
self.setup_camera_subscriptions("Center")
self.setup_lane_publisher()
self.setup_dist_array_subscription()
self.setup_overlay_publisher()
# setup publishers
self.setup_lane_publisher()
self.setup_driveable_area_publisher()
self.setup_mask_publisher()

self.marker_publisher = self.new_publisher(
msg_type=MarkerArray,
topic="/paf/hero/visualization_marker_array_lane",
qos_profile=1,
)

def run(self):
self.spin()
Expand Down Expand Up @@ -104,28 +97,6 @@ def setup_driveable_area_publisher(self):
qos_profile=1,
)

def setup_overlay_publisher(self):
"""sets up a publisher for an graphical output with original image, highlighted lane mask
and Driveable Area
topic: /Center/Lane_detect_Overlay
"""
self.lane_overlay_publisher = self.new_publisher(
msg_type=numpy_msg(ImageMsg),
topic=f"/paf/{self.role_name}/Center/Lane_detect_Overlay",
qos_profile=1,
)

def setup_mask_publisher(self):
"""sets up a publisher for an graphical output with original image, highlighted lane mask
and Driveable Area
topic: /Center/Lane_detect_Overlay
"""
self.Lane_detection_publisher = self.new_publisher(
msg_type=numpy_msg(ImageMsg),
topic=f"/paf/{self.role_name}/Center/Lane_detection_mask",
qos_profile=1,
)

def image_handler(self, ImageMsg):
"""
Callback function for image subscriber
Expand All @@ -143,43 +114,11 @@ def image_handler(self, ImageMsg):

ll_seg_scaled, da_seg_scaled = self.postprocess_image(da_seg_out, ll_seg_out)

"""mask = np.expand_dims(ll_seg_scaled, axis=-1)
markers = MarkerArray()
if self.dist_arrays is not None:
disk_mask = np.any(self.dist_arrays, axis=2, keepdims=True)
mask = mask & disk_mask
dist_mask = np.logical_and(
np.any(mask, axis=2, keepdims=True), self.dist_arrays
)
dist = dist_mask * self.dist_arrays
points = dist[np.repeat(disk_mask, 3, axis=2)].reshape(-1, 3)
total_points_counter = 0
wrong_points_counter = 0
for i, point in enumerate(points):
if point[0] > 2:
if point[2] < 2:
total_points_counter += 1
markers.markers.append(self.get_marker(point, i))
else:
total_points_counter += 1
wrong_points_counter += 1
if wrong_points_counter / total_points_counter < 1:
self.marker_publisher.publish(markers)"""

img_msg_mask = self.bridge.cv2_to_imgmsg(ll_seg_scaled, encoding="mono8")
img_msg_mask.header = ImageMsg.header
self.Lane_detection_publisher.publish(img_msg_mask)

# overlay = self.create_top_down_bounding_boxes(ll_seg_scaled, self.dist_arrays)

ros_driveable_area = self.bridge.cv2_to_imgmsg(da_seg_scaled)
ros_lane_mask = self.bridge.cv2_to_imgmsg(ll_seg_scaled)
# ros_lane_overlay = self.bridge.cv2_to_imgmsg(overlay)

# publish
self.lane_mask_publisher.publish(ros_lane_mask)
# self.lane_overlay_publisher.publish(ros_lane_overlay)
self.driveable_area_publisher.publish(ros_driveable_area)

def handle_dist_array(self, dist_array):
Expand Down Expand Up @@ -280,120 +219,6 @@ def postprocess_image(self, da_seg_out, ll_seg_out):

return ll_seg_scaled, da_seg_scaled

""" def create_top_down_bounding_boxes(self, lane_mask, distance_array):
lidar_points = self.filter_lidar_by_lane_mask(lane_mask, distance_array)
clusters = self.cluster_lidar_points(lidar_points)
image = self.create_bounding_box_top_down(clusters)
return image"""

"""def filter_lidar_by_lane_mask(self, lane_mask, distance_array):
""" """Filter the lidar points by lane mask and remove non-lane points.""" """
# Step 1: Create a boolean mask for the points that lie inside the lane mask
lane_mask_bool = lane_mask.astype(
bool
) # Lane mask as boolean (True for lane, False for outside)
# Step 2: Create a copy of the distance_array and set all points outside the lane mask to [0.0, 0.0, 0.0]
filtered_lidar_points = distance_array.copy() # Create a copy to modify
filtered_lidar_points[~lane_mask_bool] = [
0.0,
0.0,
0.0,
] # Set points outside lane mask to [0.0, 0.0, 0.0]
# Step 3: Remove points that are [0.0, 0.0, 0.0]
valid_lidar_points = filtered_lidar_points[
np.all(filtered_lidar_points != [0.0, 0.0, 0.0], axis=2)
]
# Step 4: Extract only x and y coordinates for top-down view
# valid_lidar_points_xy = valid_lidar_points[:, :, :2] # Only take x and y
valid_lidar_points = valid_lidar_points[:, :2]
return valid_lidar_points
"""
"""def cluster_lidar_points(self, lidar_points):
""" """Cluster lidar points using DBSCAN and create a 2D bounding box."""
""" # 1. Extract the 2D coordinates (x, y) from the lidar points
if lidar_points.shape[0] == 0:
return []
# 2. Standardize the coordinates for DBSCAN clustering
scaler = StandardScaler()
valid_points_scaled = scaler.fit_transform(lidar_points[:, :2])
# 3. Apply DBSCAN for clustering
db = DBSCAN(eps=0.5, min_samples=10).fit(valid_points_scaled)
labels = db.labels_
# 4. Find the bounding box around the largest cluster (or all clusters)
clusters = []
unique_labels = np.unique(labels)
for label in unique_labels:
if label == -1:
continue # Ignore noise points
# Extract points corresponding to the current cluster
cluster_points = lidar_points[labels == label]
# Calculate the bounding box (min/max x, y coordinates)
min_x, min_y = np.min(cluster_points, axis=0)
max_x, max_y = np.max(cluster_points, axis=0)
# Store the cluster bounding box
clusters.append((min_x, min_y, max_x, max_y))
return clusters
"""
"""def create_bounding_box_top_down(self, clusters):
""" """Create a top-down view with bounding boxes
drawn around the detected lane clusters.""" """
# Initialize a blank image for the top-down view
length_image = 30 # meters
width_image = 5 # meters
top_down_image = np.zeros((600, 800), dtype=np.uint8)
# Draw bounding boxes for each cluster
for min_x, min_y, max_x, max_y in clusters:
# Convert coordinates to fit the top-down view scale (optional, if needed)
x_min = int(min_x)
y_min = int(min_y)
x_max = int(max_x)
y_max = int(max_y)
y_min = int(y_min / width_image * top_down_image.shape[0])
y_max = int(y_max / width_image * top_down_image.shape[0])
x_min = int(x_min / length_image * top_down_image.shape[1])
x_max = int(x_max / length_image * top_down_image.shape[1])
# Draw the bounding box (white rectangle)
cv2.rectangle(top_down_image, (x_min, y_min), (x_max, y_max), 255, 5)
return top_down_image"""

"""def get_marker(self, point, id):
c_color = [0, 255, 0]
marker = Marker()
marker.header.frame_id = "hero"
marker.header.stamp = rospy.Time.now()
marker.ns = "min_values"
marker.id = id
marker.type = Marker.CUBE
marker.action = Marker.ADD
marker.scale.x = 0.1
marker.scale.y = 0.1
marker.scale.z = 0.1
marker.color.a = 1.0
marker.color.r = c_color[0]
marker.color.g = c_color[1]
marker.color.b = c_color[2]
marker.pose.orientation.x = 0.0
marker.pose.orientation.y = 0.0
marker.pose.orientation.z = 0.0
marker.pose.orientation.w = 1.0
marker.pose.position.x = point[0]
marker.pose.position.y = point[1]
marker.pose.position.z = 1.7 + point[2]
marker.lifetime = rospy.Duration(0.5)
return marker"""


if __name__ == "__main__":
roscomp.init("Lanedetection_node")
Expand Down

0 comments on commit cd13e96

Please sign in to comment.