Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added laser data from map in nav2_loopback_sim #4617

Merged
merged 17 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 225 additions & 18 deletions nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@
from geometry_msgs.msg import PoseWithCovarianceStamped, Twist
from geometry_msgs.msg import Quaternion, TransformStamped, Vector3
from nav_msgs.msg import Odometry
from nav_msgs.srv import GetMap
import rclpy
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import LaserScan
from tf2_ros import TransformBroadcaster
from tf2_ros import Buffer, TransformBroadcaster, TransformListener
import tf_transformations

from .utils import addYawToQuat, matrixToTransform, transformStampedToMatrix
# from nav2_simple_commander.line_iterator import LineIterator

from .utils import (
addYawToQuat,
getMapOccupancy,
mapToWorld,
matrixToTransform,
transformStampedToMatrix,
worldToMap,
)

LETHAL_OBSTACLE = 100

"""
This is a loopback simulator that replaces a physics simulator to create a
Expand All @@ -48,6 +59,8 @@ def __init__(self):
self.initial_pose = None
self.timer = None
self.setupTimer = None
self.map = None
self.mat_base_to_laser = None

self.declare_parameter('update_duration', 0.01)
self.update_dur = self.get_parameter('update_duration').get_parameter_value().double_value
Expand All @@ -64,6 +77,14 @@ def __init__(self):
self.declare_parameter('scan_frame_id', 'base_scan')
self.scan_frame_id = self.get_parameter('scan_frame_id').get_parameter_value().string_value

self.declare_parameter('publish_map_odom_tf', True)
self.publish_map_odom_tf = self.get_parameter(
'publish_map_odom_tf').get_parameter_value().bool_value

self.declare_parameter('scan_publish_dur', 0.1)
self.scan_publish_dur = self.get_parameter(
'scan_publish_dur').get_parameter_value().double_value

self.t_map_to_odom = TransformStamped()
self.t_map_to_odom.header.frame_id = self.map_frame_id
self.t_map_to_odom.child_frame_id = self.odom_frame_id
Expand All @@ -88,12 +109,32 @@ def __init__(self):
self.scan_pub = self.create_publisher(LaserScan, 'scan', sensor_qos)

self.setupTimer = self.create_timer(0.1, self.setupTimerCallback)

self.map_client = self.create_client(GetMap, '/map_server/map')

self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)

self.getMap()

self.info('Loopback simulator initialized')

def getBaseToLaserTf(self):
try:
# Wait for transform and lookup
transform = self.tf_buffer.lookup_transform(
self.base_frame_id, self.scan_frame_id, rclpy.time.Time())
self.mat_base_to_laser = transformStampedToMatrix(transform)

except Exception as ex:
self.get_logger().error('Transform lookup failed: %s' % str(ex))

def setupTimerCallback(self):
# Publish initial identity odom transform & laser scan to warm up system
self.tf_broadcaster.sendTransform(self.t_odom_to_base_link)
self.publishLaserScan()
if self.mat_base_to_laser is None:
self.mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_link)
self.getBaseToLaserTf()

def cmdVelCallback(self, msg):
self.debug('Received cmd_vel')
Expand All @@ -115,13 +156,15 @@ def initialPoseCallback(self, msg):
self.t_odom_to_base_link.transform.translation = Vector3()
self.t_odom_to_base_link.transform.rotation = Quaternion()
self.publishTransforms(self.t_map_to_odom, self.t_odom_to_base_link)
self.mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odom)
JatinPatil2003 marked this conversation as resolved.
Show resolved Hide resolved

# Start republication timer and velocity processing
if self.setupTimer is not None:
self.setupTimer.cancel()
self.setupTimer.destroy()
self.setupTimer = None
self.timer = self.create_timer(self.update_dur, self.timerCallback)
self.timer_laser = self.create_timer(self.scan_publish_dur, self.publishLaserScan)
return

self.initial_pose = msg.pose.pose
Expand Down Expand Up @@ -165,29 +208,35 @@ def timerCallback(self):

self.publishTransforms(self.t_map_to_odom, self.t_odom_to_base_link)
self.publishOdometry(self.t_odom_to_base_link)
self.publishLaserScan()
self.mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odom)
JatinPatil2003 marked this conversation as resolved.
Show resolved Hide resolved
self.mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_link)
JatinPatil2003 marked this conversation as resolved.
Show resolved Hide resolved

def publishLaserScan(self):
# Publish a bogus laser scan for collision monitor
scan = LaserScan()
# scan.header.stamp = (self.get_clock().now()).to_msg()
scan.header.frame_id = self.scan_frame_id
scan.angle_min = -math.pi
scan.angle_max = math.pi
scan.angle_increment = 0.005817705996 # 0.333 degrees
scan.time_increment = 0.0
scan.scan_time = 0.1
scan.range_min = 0.1
scan.range_max = 100.0
num_samples = int((scan.angle_max - scan.angle_min) / scan.angle_increment)
scan.ranges = [scan.range_max - 0.01] * num_samples
self.scan_pub.publish(scan)
self.scan_msg = LaserScan()
self.scan_msg.header.stamp = (self.get_clock().now()).to_msg()
JatinPatil2003 marked this conversation as resolved.
Show resolved Hide resolved
SteveMacenski marked this conversation as resolved.
Show resolved Hide resolved
self.scan_msg.header.frame_id = self.scan_frame_id
self.scan_msg.angle_min = -math.pi
self.scan_msg.angle_max = math.pi
# 1.5 degrees
self.scan_msg.angle_increment = 0.0261799
self.scan_msg.time_increment = 0.0
self.scan_msg.scan_time = 0.1
self.scan_msg.range_min = 0.05
self.scan_msg.range_max = 100.0
SteveMacenski marked this conversation as resolved.
Show resolved Hide resolved
num_samples = int(
(self.scan_msg.angle_max - self.scan_msg.angle_min) /
self.scan_msg.angle_increment)
self.scan_msg.ranges = [0.0] * num_samples
self.getLaserScan(num_samples)
self.scan_pub.publish(self.scan_msg)

def publishTransforms(self, map_to_odom, odom_to_base_link):
map_to_odom.header.stamp = \
(self.get_clock().now() + Duration(seconds=self.update_dur)).to_msg()
odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
self.tf_broadcaster.sendTransform(map_to_odom)
if self.publish_map_odom_tf:
self.tf_broadcaster.sendTransform(map_to_odom)
self.tf_broadcaster.sendTransform(odom_to_base_link)

def publishOdometry(self, odom_to_base_link):
Expand All @@ -209,6 +258,164 @@ def debug(self, msg):
self.get_logger().debug(msg)
return

def getMap(self):
request = GetMap.Request()
if self.map_client.wait_for_service(timeout_sec=5.0):
# Request to get map
future = self.map_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
self.map = future.result().map
self.get_logger().info('Laser scan will be populated using map data')
else:
self.get_logger().warn(
'Failed to get map, '
'Laser scan will be populated using max range'
)
else:
self.get_logger().warn(
'Failed to get map, '
'Laser scan will be populated using max range'
)

def getLaserPose(self):
mat_map_to_laser = tf_transformations.concatenate_matrices(
self.mat_map_to_odom,
JatinPatil2003 marked this conversation as resolved.
Show resolved Hide resolved
self.mat_odom_to_base,
self.mat_base_to_laser
)
transform = matrixToTransform(mat_map_to_laser)

x = transform.translation.x
y = transform.translation.y
theta = tf_transformations.euler_from_quaternion([
transform.rotation.x,
transform.rotation.y,
transform.rotation.z,
transform.rotation.w
])[2]

return x, y, theta

def getLaserScan(self, num_samples):
if self.map is None or self.initial_pose is None or self.mat_base_to_laser is None:
self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples
return

x, y, theta = self.getLaserPose()

mx, my = worldToMap(x, y, self.map)

if not mx and not my:
self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples
return

for i in range(int(num_samples)):
curr_angle = theta + self.scan_msg.angle_min + i * self.scan_msg.angle_increment
self.scan_msg.ranges[i] = self.findMapRange(mx, my, x, y, curr_angle)
SteveMacenski marked this conversation as resolved.
Show resolved Hide resolved

# def getLaserScan(self, num_samples):
# if self.map is None or self.initial_pose is None or self.mat_base_to_laser is None:
# self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples
# return

# x0, y0 ,theta = self.getLaserPose()

# max_distance = math.sqrt(
# (self.map.info.width * self.map.info.resolution) ** 2 +
# (self.map.info.height * self.map.info.resolution) ** 2)
# max_distance = min(max_distance, self.scan_msg.range_max)
# for i in range(num_samples):
# curr_angle = theta + self.scan_msg.angle_min + i * self.scan_msg.angle_increment
# x1 = float(x0) + max_distance * math.cos(curr_angle)
# y1 = float(y0) + max_distance * math.sin(curr_angle)

# line_iterator = LineIterator(float(x0), float(y0), x1, y1, self.map.info.resolution)

# while line_iterator.isValid():
# mx, my = worldToMap(line_iterator.getX(), line_iterator.getY(), self.map)

# if not mx and not my:
# break

# point_cost = getMapOccupancy(mx, my, self.map)

# if point_cost >= 60:
# self.scan_msg.ranges[i] = math.sqrt(
# (line_iterator.getX() - x0) ** 2 + (line_iterator.getY() - y0) ** 2)
# break

# line_iterator.advance()

def findMapRange(self, mx, my, x, y, theta):
# Using "A Faster Voxel Traversal Algorithm for Ray Tracing" by Amanatides & Woo
# ======== Initialization Phase ========
origin = [x, y] # u
direction = [math.cos(theta), math.sin(theta)] # v

current = [mx, my] # X, Y
step = [0, 0] # stepX, stepY
tMax = [0.0, 0.0] # tMaxX, tMaxY
tDelta = [0.0, 0.0] # tDeltaX, tDeltaY

voxel_border = [0.0, 0.0]
voxel_border[0], voxel_border[1] = mapToWorld(
current[0], current[1], self.map)

for i in range(2): # For each dimension (x, y)
# Determine step direction
if direction[i] > 0.0:
step[i] = 1
elif direction[i] < 0.0:
step[i] = -1
else:
step[i] = 0

# Determine tMax, tDelta
if step[i] != 0:
if step[i] == 1:
voxel_border[i] += step[i] * self.map.info.resolution

# tMax - voxel boundary crossing
tMax[i] = (voxel_border[i] - origin[i]) / direction[i]
# tDelta - distance along ray
# so that vertical/horizontal component equals one voxel
tDelta[i] = self.map.info.resolution / abs(direction[i])
else:
tMax[i] = float('inf')
tDelta[i] = float('inf')

# ======== Incremental Traversal ========
while True:
# Advance
dim = 0 if tMax[0] < tMax[1] else 1

# Advance one voxel
current[dim] += step[dim]
tMax[dim] += tDelta[dim]

# Check map inbounds
if (current[0] < 0 or current[0] >= self.map.info.width or
current[1] < 0 or current[1] >= self.map.info.height):
return self.scan_msg.range_max - 0.1

# Determine current range
current_range = math.sqrt(
(current[0] - mx) ** 2 + (current[1] - my) ** 2
) * self.map.info.resolution

# Are we at max range?
if current_range > self.scan_msg.range_max:
return self.scan_msg.range_max - 0.1
else:
occ = getMapOccupancy(current[0], current[1], self.map)
if occ >= 60: # Current cell is occupied
# Is range less than min range
if current_range < self.scan_msg.range_min:
return 0.0

return current_range


def main():
rclpy.init()
Expand Down
25 changes: 25 additions & 0 deletions nav2_loopback_sim/nav2_loopback_sim/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.


import math

from geometry_msgs.msg import Quaternion, Transform
import numpy as np
import tf_transformations
Expand Down Expand Up @@ -63,3 +65,26 @@ def matrixToTransform(matrix):
transform.rotation.z = quaternion[2]
transform.rotation.w = quaternion[3]
return transform


def worldToMap(world_x, world_y, map_msg):
# Check if world coordinates are out of bounds
if (world_x < map_msg.info.origin.position.x or world_y < map_msg.info.origin.position.y):
return None, None # Coordinates are out of bounds

map_x = int(math.floor((world_x - map_msg.info.origin.position.x) / map_msg.info.resolution))
map_y = int(math.floor((world_y - map_msg.info.origin.position.y) / map_msg.info.resolution))

if not map_x < map_msg.info.width or not map_y < map_msg.info.height:
return None, None # Coordinates are out of bounds
return map_x, map_y


def mapToWorld(map_x, map_y, map_msg):
world_x = map_msg.info.origin.position.x + ((map_x + 0.5) * map_msg.info.resolution)
world_y = map_msg.info.origin.position.y + ((map_y + 0.5) * map_msg.info.resolution)
return world_x, world_y


def getMapOccupancy(x, y, map_msg):
return map_msg.data[y * map_msg.info.width + x]
Loading