Skip to content

Commit

Permalink
Added scan using LineIterator
Browse files Browse the repository at this point in the history
Signed-off-by: Jatin Patil <[email protected]>
  • Loading branch information
JatinPatil2003 committed Aug 16, 2024
1 parent b9fa086 commit 79cac35
Showing 1 changed file with 81 additions and 33 deletions.
114 changes: 81 additions & 33 deletions nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from tf2_ros import Buffer, TransformBroadcaster, TransformListener
import tf_transformations

# from nav2_simple_commander.line_iterator import LineIterator

from .utils import (
addYawToQuat,
getMapOccupancy,
Expand All @@ -36,6 +38,8 @@
worldToMap,
)

LETHAL_OBSTACLE = 100

"""
This is a loopback simulator that replaces a physics simulator to create a
frictionless, inertialess, and collisionless simulation environment. It
Expand All @@ -56,6 +60,7 @@ def __init__(self):
self.timer = None
self.setupTimer = None
self.map = None
self.mat_base_to_laser = None

self.declare_parameter('update_duration', 0.01)
self.update_dur = self.get_parameter('update_duration').get_parameter_value().double_value
Expand All @@ -72,13 +77,13 @@ def __init__(self):
self.declare_parameter('scan_frame_id', 'base_scan')
self.scan_frame_id = self.get_parameter('scan_frame_id').get_parameter_value().string_value

self.declare_parameter('publish_map_odom_tf', False)
self.declare_parameter('publish_map_odom_tf', True)
self.publish_map_odom_tf = self.get_parameter(
'publish_map_odom_tf').get_parameter_value().bool_value

self.declare_parameter('scan_publish_rate', 10.0)
self.scan_publish_rate = self.get_parameter(
'scan_publish_rate').get_parameter_value().double_value
self.declare_parameter('scan_publish_dur', 0.1)
self.scan_publish_dur = self.get_parameter(
'scan_publish_dur').get_parameter_value().double_value

self.t_map_to_odom = TransformStamped()
self.t_map_to_odom.header.frame_id = self.map_frame_id
Expand Down Expand Up @@ -114,10 +119,22 @@ def __init__(self):

self.info('Loopback simulator initialized')

def getBaseToLaserTf(self):
try:
# Wait for transform and lookup
transform = self.tf_buffer.lookup_transform(
self.base_frame_id, self.scan_frame_id, rclpy.time.Time())
self.mat_base_to_laser = transformStampedToMatrix(transform)

except Exception as ex:
self.get_logger().error('Transform lookup failed: %s' % str(ex))

def setupTimerCallback(self):
# Publish initial identity odom transform & laser scan to warm up system
self.tf_broadcaster.sendTransform(self.t_odom_to_base_link)
self.publishLaserScan()
if self.mat_base_to_laser is None:
self.mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_link)
self.getBaseToLaserTf()

def cmdVelCallback(self, msg):
self.debug('Received cmd_vel')
Expand All @@ -139,14 +156,15 @@ def initialPoseCallback(self, msg):
self.t_odom_to_base_link.transform.translation = Vector3()
self.t_odom_to_base_link.transform.rotation = Quaternion()
self.publishTransforms(self.t_map_to_odom, self.t_odom_to_base_link)
self.mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odom)

# Start republication timer and velocity processing
if self.setupTimer is not None:
self.setupTimer.cancel()
self.setupTimer.destroy()
self.setupTimer = None
self.timer = self.create_timer(self.update_dur, self.timerCallback)
self.timer_laser = self.create_timer((1/self.scan_publish_rate), self.publishLaserScan)
self.timer_laser = self.create_timer(self.scan_publish_dur, self.publishLaserScan)
return

self.initial_pose = msg.pose.pose
Expand Down Expand Up @@ -190,6 +208,8 @@ def timerCallback(self):

self.publishTransforms(self.t_map_to_odom, self.t_odom_to_base_link)
self.publishOdometry(self.t_odom_to_base_link)
self.mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odom)
self.mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_link)

def publishLaserScan(self):
# Publish a bogus laser scan for collision monitor
Expand All @@ -208,8 +228,7 @@ def publishLaserScan(self):
(self.scan_msg.angle_max - self.scan_msg.angle_min) /
self.scan_msg.angle_increment)
self.scan_msg.ranges = [0.0] * num_samples
x, y, theta = self.getLaserPose()
self.getLaserScan(num_samples, x, y, theta)
self.getLaserScan(num_samples)
self.scan_pub.publish(self.scan_msg)

def publishTransforms(self, map_to_odom, odom_to_base_link):
Expand Down Expand Up @@ -260,34 +279,31 @@ def getMap(self):
)

def getLaserPose(self):
try:
# Wait for transform and lookup
now = rclpy.time.Time()
transform = self.tf_buffer.lookup_transform(
'map', self.scan_frame_id, now, rclpy.duration.Duration(seconds=0.1))

# Extract pose information
x = transform.transform.translation.x
y = transform.transform.translation.y
rotation = transform.transform.rotation
theta = tf_transformations.euler_from_quaternion([
rotation.x,
rotation.y,
rotation.z,
rotation.w
])[2]

return x, y, theta

except Exception as ex:
self.get_logger().error('Transform lookup failed: %s' % str(ex))
return 0.0, 0.0, 0.0

def getLaserScan(self, num_samples, x, y, theta):
if self.map is None or self.initial_pose is None:
mat_map_to_laser = tf_transformations.concatenate_matrices(
self.mat_map_to_odom,
self.mat_odom_to_base,
self.mat_base_to_laser
)
transform = matrixToTransform(mat_map_to_laser)

x = transform.translation.x
y = transform.translation.y
theta = tf_transformations.euler_from_quaternion([
transform.rotation.x,
transform.rotation.y,
transform.rotation.z,
transform.rotation.w
])[2]

return x, y, theta

def getLaserScan(self, num_samples):
if self.map is None or self.initial_pose is None or self.mat_base_to_laser is None:
self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples
return

x, y, theta = self.getLaserPose()

mx, my = worldToMap(x, y, self.map)

if not mx and not my:
Expand All @@ -298,6 +314,38 @@ def getLaserScan(self, num_samples, x, y, theta):
curr_angle = theta + self.scan_msg.angle_min + i * self.scan_msg.angle_increment
self.scan_msg.ranges[i] = self.findMapRange(mx, my, x, y, curr_angle)

# def getLaserScan(self, num_samples):
# if self.map is None or self.initial_pose is None or self.mat_base_to_laser is None:
# self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples
# return

# x0, y0 ,theta = self.getLaserPose()

# max_distance = math.sqrt(
# (self.map.info.width * self.map.info.resolution) ** 2 +
# (self.map.info.height * self.map.info.resolution) ** 2)
# for i in range(num_samples):
# curr_angle = theta + self.scan_msg.angle_min + i * self.scan_msg.angle_increment
# x1 = float(x0) + max_distance * math.cos(curr_angle)
# y1 = float(y0) + max_distance * math.sin(curr_angle)

# line_iterator = LineIterator(float(x0), float(y0), x1, y1, self.map.info.resolution)

# while line_iterator.isValid():
# mx, my = worldToMap(line_iterator.getX(), line_iterator.getY(), self.map)

# if not mx and not my:
# break

# point_cost = getMapOccupancy(mx, my, self.map)

# if point_cost >= 60:
# self.scan_msg.ranges[i] = math.sqrt(
# (line_iterator.getX() - x0) ** 2 + (line_iterator.getY() - y0) ** 2)
# break

# line_iterator.advance()

def findMapRange(self, mx, my, x, y, theta):
# Using "A Faster Voxel Traversal Algorithm for Ray Tracing" by Amanatides & Woo
# ======== Initialization Phase ========
Expand Down

0 comments on commit 79cac35

Please sign in to comment.