Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Robot_state_Updater] added kitchenupdater from robocup branch #161

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/pycram/ros/ros_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def get_parameter(name: str) -> Any:
return rospy.get_param(name)


def wait_for_message(topic_name: str):
return rospy.wait_for_message(topic_name)
def wait_for_message(topic_name: str, topic_type):
return rospy.wait_for_message(topic_name, topic_type=topic_type)


def is_master_online():
Expand Down
52 changes: 52 additions & 0 deletions src/pycram/ros_utils/robot_state_updater.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import atexit
import tf
import time
import rospy

from geometry_msgs.msg import TransformStamped
from sensor_msgs.msg import JointState
Expand Down Expand Up @@ -67,3 +68,54 @@ def _stop_subscription(self) -> None:
"""
self.tf_timer.shutdown()
self.joint_state_timer.shutdown()


class KitchenStateUpdater:
"""
Updates the robot in the Bullet World with information of the real robot published to ROS topics.
Infos used to update the robot are:
* The current pose of the robot
* The current joint state of the robot
"""

def __init__(self, tf_topic: str, joint_state_topic: str):
"""
The robot state updater uses a TF topic and a joint state topic to get the current state of the robot.

:param tf_topic: Name of the TF topic, needs to publish geometry_msgs/TransformStamped
:param joint_state_topic: Name of the joint state topic, needs to publish sensor_msgs/JointState
"""
self.tf_listener = tf.TransformListener()
rospy.sleep(1)
self.tf_topic = tf_topic
self.joint_state_topic = joint_state_topic

self.joint_state_timer = rospy.Timer(rospy.Duration(0.1), self._subscribe_joint_state)

atexit.register(self._stop_subscription)

def _subscribe_joint_state(self, msg: JointState) -> None:
"""
Sets the current joint configuration of the robot in the bullet world to the configuration published on the topic.
Since this uses rospy.wait_for_message which can have errors when used with threads there might be an attribute error
in the rospy implementation.

:param msg: JointState message published to the topic.
"""
try:
msg = wait_for_message(self.joint_state_topic, JointState)
for name, position in zip(msg.name, msg.position):
try:
# Set the joint state if the joint exists
World.current_world.set_joint_state(name, position)
except KeyError:
# Handle the case where the joint name does not exist
pass
except AttributeError:
pass

def _stop_subscription(self) -> None:
"""
Stops the Timer for TF and joint states and therefore the updating of the robot in the bullet world.
"""
self.joint_state_timer.shutdown()
Loading