diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ae0f320 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +# python auto generated files and directories +__pycache__/ diff --git a/sr_ros2_python_utils/transforms.py b/sr_ros2_python_utils/transforms.py index bec7a1d..7f9d4bd 100644 --- a/sr_ros2_python_utils/transforms.py +++ b/sr_ros2_python_utils/transforms.py @@ -23,19 +23,22 @@ from tf2_ros import TransformException from tf2_ros.buffer import Buffer from tf2_ros.transform_listener import TransformListener -from geometry_msgs.msg import Pose, PoseStamped, Vector3, Vector3Stamped +from geometry_msgs.msg import Pose, PoseStamped, Vector3, Vector3Stamped, TransformStamped from numpy import arctan2, arcsin -def _apply_local_offset(pose:PoseStamped, x:float=0.0, y:float=0.0, z:float=0.0) -> PoseStamped: +def _apply_local_offset( + pose: PoseStamped, x: float = 0.0, y: float = 0.0, z: float = 0.0 +) -> PoseStamped: """Applies linear offset to the given pose in the local coordinate system""" result_pose = pose result_pose.pose.position.x += x result_pose.pose.position.y += y result_pose.pose.position.z += z - + return result_pose + def _euler_from_quaternion(quaternion): """ Converts quaternion (w in last place) to euler roll, pitch, yaw @@ -60,7 +63,8 @@ def _euler_from_quaternion(quaternion): return roll, pitch, yaw -def _quaternion_from_euler(ai:float, aj:float, ak:float): + +def _quaternion_from_euler(ai: float, aj: float, ak: float): # quaternion order is [qx, qy, qz, qw] ai /= 2.0 aj /= 2.0 @@ -71,21 +75,21 @@ def _quaternion_from_euler(ai:float, aj:float, ak:float): sj = math.sin(aj) ck = math.cos(ak) sk = math.sin(ak) - cc = ci*ck - cs = ci*sk - sc = si*ck - ss = si*sk + cc = ci * ck + cs = ci * sk + sc = si * ck + ss = si * sk - q = numpy.empty((4, )) - q[0] = cj*sc - sj*cs - q[1] = cj*ss + sj*cc - q[2] = cj*cs - sj*sc - q[3] = cj*cc + sj*ss + q = numpy.empty((4,)) + q[0] = cj * sc - sj * cs + q[1] = cj * ss + sj * cc + q[2] = cj * cs - sj * sc + q[3] = cj * cc + sj * ss return q -def _quaternion_multiply(q0:ArrayLike, q1:ArrayLike) -> ArrayLike: +def _quaternion_multiply(q0: ArrayLike, q1: ArrayLike) -> ArrayLike: """ Multiplies two quaternions. Convention used [qx, qy, qz, qw] @@ -122,8 +126,68 @@ def _quaternion_multiply(q0:ArrayLike, q1:ArrayLike) -> ArrayLike: return final_quaternion +def _mirror_vector_at_axis( + vector: Vector3, x_axis: bool = False, y_axis: bool = False, z_axis: bool = False +) -> Vector3: + x_mirror = -1 if x_axis else 1 + y_mirror = -1 if y_axis else 1 + z_mirror = -1 if z_axis else 1 + + mirrored_vector = Vector3() + mirrored_vector.x = x_mirror * vector.x + mirrored_vector.y = y_mirror * vector.y + mirrored_vector.z = z_mirror * vector.z + + return mirrored_vector + + +def _rotate_vector( + vector: Vector3, + theta_x: float = 0.0, + theta_y: float = 0.0, + theta_z: float = 0.0, +) -> Vector3: + v = numpy.array([vector.x, vector.y, vector.z]) + # Rotation matrices + R_x = numpy.array( + [ + [1, 0, 0], + [0, numpy.cos(theta_x), -numpy.sin(theta_x)], + [0, numpy.sin(theta_x), numpy.cos(theta_x)], + ] + ) + R_y = numpy.array( + [ + [numpy.cos(theta_y), 0, numpy.sin(theta_y)], + [0, 1, 0], + [-numpy.sin(theta_y), 0, numpy.cos(theta_y)], + ] + ) + R_z = numpy.array( + [ + [numpy.cos(theta_z), -numpy.sin(theta_z), 0], + [numpy.sin(theta_z), numpy.cos(theta_z), 0], + [0, 0, 1], + ] + ) + + # Combined rotation matrix + R = R_z @ R_y @ R_x + + # Rotate the vector + v_rotated = R @ v + + vector_rotated = Vector3() + vector_rotated.x = v_rotated[0] + vector_rotated.y = v_rotated[1] + vector_rotated.z = v_rotated[2] + return vector_rotated + + class TCPTransforms: - def __init__(self, node:Node, tcp_link_name:str='tcp_link', tool_link_name:str='tcp_gripper') -> None: + def __init__( + self, node: Node, tcp_link_name: str = "tcp_link", tool_link_name: str = "tcp_gripper" + ) -> None: """ TCP transformation helper class @@ -146,11 +210,17 @@ def get_transform(self, target_frame: str, source_frame: str): rate = self.node.create_rate(1) for t in range(1, 6): try: - transform = self.tf_buffer.lookup_transform(target_frame, source_frame, rclpy.time.Time()) + transform = self.tf_buffer.lookup_transform( + target_frame, source_frame, rclpy.time.Time() + ) except TransformException as e: - self.node.get_logger().warn(f"Could not transform '{source_frame}' to '{target_frame}': {e}") + self.node.get_logger().warn( + f"Could not transform '{source_frame}' to '{target_frame}': {e}" + ) if t == 5: - self.node.get_logger().error(f"Transform between {source_frame} and {target_frame} does not exist or the data are too old!") + self.node.get_logger().error( + f"Transform between {source_frame} and {target_frame} does not exist or the data are too old!" + ) return None self.node.get_logger().info(f"Trying {5-t} more times ...") rate.sleep() @@ -162,31 +232,51 @@ def to_target_frame(self, pose_stamped: PoseStamped, target_frame: str): return None return self.tf_buffer.transform(pose_stamped, target_frame) - def to_from_tcp_pose_conversion(self, pose_source_frame: Pose, source_frame: str, target_frame: str, apply_tool_offset: bool=True) -> Pose: + def tcp_pose_shifted_by_tool_pose( + self, + pose_source_frame: Pose, + source_frame: str, + target_frame: str, + apply_tool_offset: bool = True, + ) -> Pose: """apply_tool_tf is used when pose source should be first transformed locally with a tool offset""" # frame transforms - # P_tcp|tgt = tgt_T_src * P_tcp|src - # tgt_T_src = lookup(target_frame, source_frame) = src_tgt_transform - # P_tcp|src = src_T_tool * P_tcp|tool = tcp_pose_source_frame - # src_T_tool = tool_src_transform (from input pose) # P_tcp|tool = tcp_pose_tool_frame (from tcp_tool_transform = lookup(tool, tcp)) + # src_T_tool = tool_src_transform (from input pose) + # P_tcp|src = src_T_tool * P_tcp|tool = tcp_pose_source_frame + # tgt_T_src = lookup(target_frame, source_frame) = src_tgt_transform + # P_tcp|tgt = tgt_T_src * P_tcp|src src_tgt_transform = self.get_transform(target_frame, source_frame) if not src_tgt_transform: return None + if apply_tool_offset: # transformation from tcp to tool - tcp_tool_transform = self.get_transform(self.tool_frame, self.tcp_frame) - if not tcp_tool_transform: + tcp_in_tool_transform = self.get_transform(self.tool_frame, self.tcp_frame) + print(f"tcp_in_tool_transform {tcp_in_tool_transform}") + if not tcp_in_tool_transform: return None # create a tool pose in tcp_frame to which the transform from source (=tcp) frame can be applied - tcp_pose_tool_frame = tf2_geometry_msgs.Pose() - #tcp_pose_tool_frame.header.frame_id = self.tool_frame - tcp_pose_tool_frame.position.x = tcp_tool_transform.transform.translation.x - tcp_pose_tool_frame.position.y = tcp_tool_transform.transform.translation.y - tcp_pose_tool_frame.position.z = tcp_tool_transform.transform.translation.z - tcp_pose_tool_frame.orientation = tcp_tool_transform.transform.rotation + tcp_pose_tool_frame = tf2_geometry_msgs.PoseStamped() + tcp_pose_tool_frame.header.frame_id = self.tool_frame + tcp_pose_tool_frame.pose.position.x = -tcp_in_tool_transform.transform.translation.x + tcp_pose_tool_frame.pose.position.y = tcp_in_tool_transform.transform.translation.y + tcp_pose_tool_frame.pose.position.z = -tcp_in_tool_transform.transform.translation.z + tcp_pose_tool_frame.pose.orientation = tcp_in_tool_transform.transform.rotation + + # Rotate TCP Pose in Tool to the source frame + tool_source_transform = self.get_transform(source_frame, self.tool_frame) + tool_source_transform.transform.translation.x = 0.0 + tool_source_transform.transform.translation.y = 0.0 + tool_source_transform.transform.translation.z = 0.0 + # tcp_pose_tool_frame = tf2_geometry_msgs.do_transform_pose_stamped( + # tcp_pose_tool_frame, tool_source_transform + # ) + print(f"tool_source_transform {tool_source_transform}") + print(f"tcp_pose_tool_frame {tcp_pose_tool_frame}") + # create a transform from source tool to source frame tool_src_transform = tf2_geometry_msgs.TransformStamped() tool_src_transform.header.frame_id = source_frame @@ -200,23 +290,141 @@ def to_from_tcp_pose_conversion(self, pose_source_frame: Pose, source_frame: str tool_src_transform = None if tcp_pose_tool_frame is not None: - # get the tcp pose in source frame by applying tool to source frame transform to the tcp in tool frame pose - tcp_pose_source_frame = tf2_geometry_msgs.do_transform_pose(tcp_pose_tool_frame, tool_src_transform) + # get the tcp pose in source frame by applying tool to source frame transform to the tcp in tool frame pose + tcp_pose_source_frame = tf2_geometry_msgs.do_transform_pose_stamped( + tcp_pose_tool_frame, tool_src_transform + ) + print(f"tcp_pose_source_frame {tcp_pose_source_frame}") else: # the tcp pose is the source pose tcp_pose_source_frame = pose_source_frame # apply the target frame to source frame transformation - pose_target_frame = tf2_geometry_msgs.do_transform_pose(tcp_pose_source_frame, src_tgt_transform) + pose_target_frame = tf2_geometry_msgs.do_transform_pose_stamped( + tcp_pose_source_frame, src_tgt_transform + ) + print(f"pose_target_frame {pose_target_frame}") + + return pose_target_frame.pose + + # 1. use PoseStamped in a new method with sensible name better then "to_from_tcp_pose_conversion" + # 2. Use the new method from "to_from_tcp_pose_conversion" + # 3. Use the new method everywhere you can find and switch fastly (if necessray add an intermdeide variable with PoseStamped to the code and then extract the pose for the rest) + # 4. Add deprication notice to "to_from_tcp_pose_conversion" as "rclpy.warn()" + + def to_from_tcp_pose_conversion( + self, + pose_source_frame: Pose, + source_frame: str, + target_frame: str, + apply_tool_offset: bool = True, + ignore_tool_to_tcp_rotation: bool = False, + ) -> Pose: + """apply_tool_tf is used when pose source should be first transformed locally with a tool offset""" - return pose_target_frame + """First we get local transformation from tool to TCP, then we transform this new Pose (from TCP in tool frame), to the target position using input pose.""" + """The question we aks ourself here is: where should my 'TCP' be to achieve the 'pose_source_frame' with my 'tool'""" + + # frame transforms + # P_tcp|tgt = tgt_T_src * P_tcp|src + # tgt_T_src = lookup(target_frame, source_frame) = src_tgt_transform + # P_tcp|src = src_T_tool * P_tcp|tool = tcp_pose_source_frame + # src_T_tool = tool_src_transform (from input pose) + # P_tcp|tool = tcp_pose_tool_frame (from tcp_tool_transform = lookup(tool, tcp)) + pose_source_frame_stamped = tf2_geometry_msgs.PoseStamped() + pose_source_frame_stamped.header.frame_id = source_frame + pose_source_frame_stamped.pose.position = pose_source_frame.position + pose_source_frame_stamped.pose.orientation = pose_source_frame.orientation - def to_from_tcp_vec3_conversion(self, vec3_source_frame: Vector3Stamped, source_frame: str, target_frame: str) -> Vector3: - """Apply tf transformation to a vector3""" src_tgt_transform = self.get_transform(target_frame, source_frame) if not src_tgt_transform: return None + if apply_tool_offset: + # transformation from tcp to tool + tcp_tool_transform = self.get_transform(self.tool_frame, self.tcp_frame) + if not tcp_tool_transform: + return None + # create a tool pose in tcp_frame to which the transform from source (=tcp) frame can be applied + tcp_pose_tool_frame = tf2_geometry_msgs.PoseStamped() + tcp_pose_tool_frame.header.frame_id = self.tool_frame + tcp_pose_tool_frame.pose.position.x = tcp_tool_transform.transform.translation.x + tcp_pose_tool_frame.pose.position.y = tcp_tool_transform.transform.translation.y + tcp_pose_tool_frame.pose.position.z = tcp_tool_transform.transform.translation.z + if ignore_tool_to_tcp_rotation: # don't set rotation + # Rotate vector the the TCP coordinate system + tool_in_tcp_transform = self.get_transform(self.tcp_frame, self.tool_frame) + if not tool_in_tcp_transform: + return None + ## remove translation first we just want to rotate to orientation of tcp frame + tool_in_tcp_transform.transform.translation.x = 0.0 + tool_in_tcp_transform.transform.translation.y = 0.0 + tool_in_tcp_transform.transform.translation.z = 0.0 + ## transform the vector to tcp_frame + vector_tool_orientated = Vector3Stamped() + vector_tool_orientated.header.frame_id = self.tool_frame + vector_tool_orientated.vector.x = tcp_pose_tool_frame.pose.position.x + vector_tool_orientated.vector.y = tcp_pose_tool_frame.pose.position.y + vector_tool_orientated.vector.z = tcp_pose_tool_frame.pose.position.z + vector_tcp_orientated = tf2_geometry_msgs.do_transform_vector3( + vector_tool_orientated, tool_in_tcp_transform + ) + tcp_pose_tool_frame.pose.position.x = vector_tcp_orientated.vector.x + tcp_pose_tool_frame.pose.position.y = vector_tcp_orientated.vector.y + tcp_pose_tool_frame.pose.position.z = vector_tcp_orientated.vector.z + + else: + tcp_pose_tool_frame.pose.orientation = tcp_tool_transform.transform.rotation - vec3_target_frame = tf2_geometry_msgs.do_transform_vector3(vec3_source_frame, src_tgt_transform) + # create a transform from source tool to source frame + tool_src_transform = tf2_geometry_msgs.TransformStamped() + tool_src_transform.header.frame_id = source_frame + tool_src_transform.child_frame_id = self.tool_frame + tool_src_transform.transform.translation.x = pose_source_frame_stamped.pose.position.x + tool_src_transform.transform.translation.y = pose_source_frame_stamped.pose.position.y + tool_src_transform.transform.translation.z = pose_source_frame_stamped.pose.position.z + tool_src_transform.transform.rotation = pose_source_frame_stamped.pose.orientation + else: + tcp_pose_tool_frame = None + tool_src_transform = None - return vec3_target_frame.vector + if tcp_pose_tool_frame is not None: + # get the tcp pose in source frame by applying tool to source frame transform to the tcp in tool frame pose + tcp_pose_source_frame = tf2_geometry_msgs.do_transform_pose_stamped( + tcp_pose_tool_frame, tool_src_transform + ) + else: + # the tcp pose is the source pose + tcp_pose_source_frame = pose_source_frame_stamped + # apply the target frame to source frame transformation + pose_target_frame = tf2_geometry_msgs.do_transform_pose_stamped( + tcp_pose_source_frame, src_tgt_transform + ) + + return pose_target_frame.pose + + def transform_vector3stamped_to_target_frame( + self, + vec3_source_frame: Vector3Stamped, + target_frame: str, + ignore_translation: bool = False, + ignore_rotation: bool = False, + ) -> Vector3Stamped: + """Apply tf transformation to a vector3""" + src_tgt_transform = self.get_transform(target_frame, vec3_source_frame.header.frame_id) + if not src_tgt_transform: + return None + if ignore_translation: + src_tgt_transform.transform.translation.x = 0.0 + src_tgt_transform.transform.translation.y = 0.0 + src_tgt_transform.transform.translation.z = 0.0 + if ignore_rotation: + src_tgt_transform.transform.rotation.x = 0.0 + src_tgt_transform.transform.rotation.y = 0.0 + src_tgt_transform.transform.rotation.z = 0.0 + src_tgt_transform.transform.rotation.w = 1.0 + + vec3_target_frame = tf2_geometry_msgs.do_transform_vector3( + vec3_source_frame, src_tgt_transform + ) + + return vec3_target_frame diff --git a/sr_ros2_python_utils/visualization_publishers.py b/sr_ros2_python_utils/visualization_publishers.py index faa3937..2acfb3e 100644 --- a/sr_ros2_python_utils/visualization_publishers.py +++ b/sr_ros2_python_utils/visualization_publishers.py @@ -12,14 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Tuple + +from builtin_interfaces.msg import Duration from rclpy.node import Node -from geometry_msgs.msg import PoseStamped, Pose, TransformStamped +from geometry_msgs.msg import Point, PoseStamped, Pose, TransformStamped, Vector3Stamped from tf2_ros import TransformBroadcaster, StaticTransformBroadcaster +from visualization_msgs.msg import Marker class VisualizatonPublisher: - def __init__(self, node:Node) -> None: + def __init__( + self, node: Node, marker_publisher_name: str = "visualization_marker_publisher" + ) -> None: """ Create and prepare class by providing a node. @@ -28,9 +34,66 @@ def __init__(self, node:Node) -> None: self.node = node self.tf_broadcaster = TransformBroadcaster(self.node) self.tf_static_broadcasters = {} - - - def publish_pose_as_transform(self, pose:Pose, frame_id:str, child_frame_id:str, is_static:bool=False) -> bool: + self.marker_publisher_name = marker_publisher_name + self.marker_publisher = node.create_publisher(Marker, self.marker_publisher_name, 10) + + def publish_vector( + self, + vec: Vector3Stamped, + start_point: Point = Point(), + lifetime: Duration = None, + id: int = None, + namespace: str = "", + marker_color: Tuple[float, float, float, float] = (0.4, 0.18, 0.56862, 1.0), + scale: Tuple[float, float, float] = (0.1, 0.2, 0.3), + ): + if not vec.header.frame_id: + self.node.get_logger().error(f"Publishing of vector failed. No frame id given.") + return + + marker = Marker() + marker.header.frame_id = vec.header.frame_id + marker.header.stamp = self.node.get_clock().now().to_msg() + marker.ns = namespace if namespace else self.node.get_namespace() + used_id = id + # create a unique id based on time if none is given + if not used_id: + seconds, nanoseconds = self.node.get_clock().now().seconds_nanoseconds() + ms = seconds * 1000 + nanoseconds // 1_000_000 + used_id = int(ms) & 0x7FFFFFF + marker.id = used_id + marker.type = Marker.ARROW + marker.action = Marker.ADD + + end_point = Point() + end_point.x = vec.vector.x # Length of the vector + end_point.y = vec.vector.y + end_point.z = vec.vector.z + + marker.points.append(start_point) + marker.points.append(end_point) + + # Set the color (RGBA) + marker.color.r = marker_color[0] + marker.color.g = marker_color[1] + marker.color.b = marker_color[2] + marker.color.a = marker_color[3] + + # Set the scale of the vector (shaft diameter, head diameter, head length) + marker.scale.x = scale[0] # Shaft diameter + marker.scale.y = scale[1] # Head diameter + marker.scale.z = scale[2] # Head length + + # Set the lifetime of the marker + if lifetime: + marker.lifetime = lifetime + + # Publish the marker + self.marker_publisher.publish(marker) + + def publish_pose_as_transform( + self, pose: Pose, frame_id: str, child_frame_id: str, is_static: bool = False + ) -> bool: """ Publish a Pose as a Transform to visualize with Rviz2 using "Axis" display. @@ -63,13 +126,14 @@ def publish_pose_as_transform(self, pose:Pose, frame_id:str, child_frame_id:str, if not tf_exists: # create a new static broadcaster self.tf_static_broadcasters[pair] = StaticTransformBroadcaster(self.node) - self.tf_static_broadcasters[pair].sendTransform(trafo) + self.tf_static_broadcasters[pair].sendTransform(trafo) else: self.tf_broadcaster.sendTransform(trafo) return True - - def publish_pose_stamped_as_transform(self, pose:PoseStamped, child_frame_id:str, is_static:bool=False) -> bool: + def publish_pose_stamped_as_transform( + self, pose: PoseStamped, child_frame_id: str, is_static: bool = False + ) -> bool: """ Publish a Stamped Pose as a Transform to visualize with Rviz2 using "Axis" display. @@ -78,8 +142,12 @@ def publish_pose_stamped_as_transform(self, pose:PoseStamped, child_frame_id:str :returns: False if input data are not valid. """ - if (not pose.header.frame_id): - self.node.get_logger().error("To publish pose stamped as transform `frame_id` in the header has to be set!") + if not pose.header.frame_id: + self.node.get_logger().error( + "To publish pose stamped as transform `frame_id` in the header has to be set!" + ) return False - return self.publish_pose_as_transform(pose.pose, pose.header.frame_id, child_frame_id, is_static) + return self.publish_pose_as_transform( + pose.pose, pose.header.frame_id, child_frame_id, is_static + )