diff --git a/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py b/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py index 94521c2fdd..54e83cbb6e 100644 --- a/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py +++ b/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py @@ -27,6 +27,8 @@ from tf2_ros import Buffer, TransformBroadcaster, TransformListener import tf_transformations +# from nav2_simple_commander.line_iterator import LineIterator + from .utils import ( addYawToQuat, getMapOccupancy, @@ -36,6 +38,8 @@ worldToMap, ) +LETHAL_OBSTACLE = 100 + """ This is a loopback simulator that replaces a physics simulator to create a frictionless, inertialess, and collisionless simulation environment. It @@ -56,6 +60,7 @@ def __init__(self): self.timer = None self.setupTimer = None self.map = None + self.mat_base_to_laser = None self.declare_parameter('update_duration', 0.01) self.update_dur = self.get_parameter('update_duration').get_parameter_value().double_value @@ -72,13 +77,13 @@ def __init__(self): self.declare_parameter('scan_frame_id', 'base_scan') self.scan_frame_id = self.get_parameter('scan_frame_id').get_parameter_value().string_value - self.declare_parameter('publish_map_odom_tf', False) + self.declare_parameter('publish_map_odom_tf', True) self.publish_map_odom_tf = self.get_parameter( 'publish_map_odom_tf').get_parameter_value().bool_value - self.declare_parameter('scan_publish_rate', 10.0) - self.scan_publish_rate = self.get_parameter( - 'scan_publish_rate').get_parameter_value().double_value + self.declare_parameter('scan_publish_dur', 0.1) + self.scan_publish_dur = self.get_parameter( + 'scan_publish_dur').get_parameter_value().double_value self.t_map_to_odom = TransformStamped() self.t_map_to_odom.header.frame_id = self.map_frame_id @@ -114,10 +119,22 @@ def __init__(self): self.info('Loopback simulator initialized') + def getBaseToLaserTf(self): + try: + # Wait for transform and lookup + transform = self.tf_buffer.lookup_transform( + self.base_frame_id, self.scan_frame_id, rclpy.time.Time()) + self.mat_base_to_laser = transformStampedToMatrix(transform) + + except Exception as ex: + self.get_logger().error('Transform lookup failed: %s' % str(ex)) + def setupTimerCallback(self): # Publish initial identity odom transform & laser scan to warm up system self.tf_broadcaster.sendTransform(self.t_odom_to_base_link) - self.publishLaserScan() + if self.mat_base_to_laser is None: + self.mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_link) + self.getBaseToLaserTf() def cmdVelCallback(self, msg): self.debug('Received cmd_vel') @@ -139,6 +156,7 @@ def initialPoseCallback(self, msg): self.t_odom_to_base_link.transform.translation = Vector3() self.t_odom_to_base_link.transform.rotation = Quaternion() self.publishTransforms(self.t_map_to_odom, self.t_odom_to_base_link) + self.mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odom) # Start republication timer and velocity processing if self.setupTimer is not None: @@ -146,7 +164,7 @@ def initialPoseCallback(self, msg): self.setupTimer.destroy() self.setupTimer = None self.timer = self.create_timer(self.update_dur, self.timerCallback) - self.timer_laser = self.create_timer((1/self.scan_publish_rate), self.publishLaserScan) + self.timer_laser = self.create_timer(self.scan_publish_dur, self.publishLaserScan) return self.initial_pose = msg.pose.pose @@ -190,6 +208,8 @@ def timerCallback(self): self.publishTransforms(self.t_map_to_odom, self.t_odom_to_base_link) self.publishOdometry(self.t_odom_to_base_link) + self.mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odom) + self.mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_link) def publishLaserScan(self): # Publish a bogus laser scan for collision monitor @@ -208,8 +228,7 @@ def publishLaserScan(self): (self.scan_msg.angle_max - self.scan_msg.angle_min) / self.scan_msg.angle_increment) self.scan_msg.ranges = [0.0] * num_samples - x, y, theta = self.getLaserPose() - self.getLaserScan(num_samples, x, y, theta) + self.getLaserScan(num_samples) self.scan_pub.publish(self.scan_msg) def publishTransforms(self, map_to_odom, odom_to_base_link): @@ -260,34 +279,31 @@ def getMap(self): ) def getLaserPose(self): - try: - # Wait for transform and lookup - now = rclpy.time.Time() - transform = self.tf_buffer.lookup_transform( - 'map', self.scan_frame_id, now, rclpy.duration.Duration(seconds=0.1)) - - # Extract pose information - x = transform.transform.translation.x - y = transform.transform.translation.y - rotation = transform.transform.rotation - theta = tf_transformations.euler_from_quaternion([ - rotation.x, - rotation.y, - rotation.z, - rotation.w - ])[2] - - return x, y, theta - - except Exception as ex: - self.get_logger().error('Transform lookup failed: %s' % str(ex)) - return 0.0, 0.0, 0.0 - - def getLaserScan(self, num_samples, x, y, theta): - if self.map is None or self.initial_pose is None: + mat_map_to_laser = tf_transformations.concatenate_matrices( + self.mat_map_to_odom, + self.mat_odom_to_base, + self.mat_base_to_laser + ) + transform = matrixToTransform(mat_map_to_laser) + + x = transform.translation.x + y = transform.translation.y + theta = tf_transformations.euler_from_quaternion([ + transform.rotation.x, + transform.rotation.y, + transform.rotation.z, + transform.rotation.w + ])[2] + + return x, y, theta + + def getLaserScan(self, num_samples): + if self.map is None or self.initial_pose is None or self.mat_base_to_laser is None: self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples return + x, y, theta = self.getLaserPose() + mx, my = worldToMap(x, y, self.map) if not mx and not my: @@ -298,6 +314,38 @@ def getLaserScan(self, num_samples, x, y, theta): curr_angle = theta + self.scan_msg.angle_min + i * self.scan_msg.angle_increment self.scan_msg.ranges[i] = self.findMapRange(mx, my, x, y, curr_angle) + # def getLaserScan(self, num_samples): + # if self.map is None or self.initial_pose is None or self.mat_base_to_laser is None: + # self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples + # return + + # x0, y0 ,theta = self.getLaserPose() + + # max_distance = math.sqrt( + # (self.map.info.width * self.map.info.resolution) ** 2 + + # (self.map.info.height * self.map.info.resolution) ** 2) + # for i in range(num_samples): + # curr_angle = theta + self.scan_msg.angle_min + i * self.scan_msg.angle_increment + # x1 = float(x0) + max_distance * math.cos(curr_angle) + # y1 = float(y0) + max_distance * math.sin(curr_angle) + + # line_iterator = LineIterator(float(x0), float(y0), x1, y1, self.map.info.resolution) + + # while line_iterator.isValid(): + # mx, my = worldToMap(line_iterator.getX(), line_iterator.getY(), self.map) + + # if not mx and not my: + # break + + # point_cost = getMapOccupancy(mx, my, self.map) + + # if point_cost >= 60: + # self.scan_msg.ranges[i] = math.sqrt( + # (line_iterator.getX() - x0) ** 2 + (line_iterator.getY() - y0) ** 2) + # break + + # line_iterator.advance() + def findMapRange(self, mx, my, x, y, theta): # Using "A Faster Voxel Traversal Algorithm for Ray Tracing" by Amanatides & Woo # ======== Initialization Phase ========