diff --git a/src/pycram/failures.py b/src/pycram/failures.py index c9647aabc..8a621536b 100644 --- a/src/pycram/failures.py +++ b/src/pycram/failures.py @@ -55,6 +55,12 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) +class SensorMonitoringCondition(PlanFailure): + """Thrown when a sensor monitoring condition is met.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + class DeliveringFailed(HighLevelFailure): """Thrown when delivering plan completely gives up.""" diff --git a/src/pycram/language.py b/src/pycram/language.py index 983ff8e18..76358ea95 100644 --- a/src/pycram/language.py +++ b/src/pycram/language.py @@ -215,7 +215,7 @@ def perform(self): for i in range(self.repeat): for child in self.children: if self.interrupted: - return + return State.FAILED, results try: results.append(child.resolve().perform()) except PlanFailure as e: diff --git a/src/pycram/ros_utils/force_torque_sensor.py b/src/pycram/ros_utils/force_torque_sensor.py index e3af68b5b..43b6cd392 100644 --- a/src/pycram/ros_utils/force_torque_sensor.py +++ b/src/pycram/ros_utils/force_torque_sensor.py @@ -8,6 +8,7 @@ from ..datastructures.enums import FilterConfig from ..datastructures.world import World +from ..failures import SensorMonitoringCondition from ..ros.filter import Butterworth from ..ros.data_types import Time from ..ros.publisher import create_publisher @@ -133,6 +134,14 @@ def _get_robot_parameters(self): elif self.robot_name == 'iai_donbot': self.wrench_topic_name = '/kms40_driver/wrench' + + elif self.robot_name == 'pr2': + raw_data = "/ft/l_gripper_motor_zeroed" + raw_data_10_sample_moving_average = "/ft/l_gripper_motor_zeroed_avg" + derivative_data = "/ft/l_gripper_motor_zeroed_derivative" + derivative_data_10_sample_moving_average = "/ft/l_gripper_motor_zeroed_derivative_avg" + + self.wrench_topic_name = raw_data else: rospy.logerr(f'{self.robot_name} is not supported') @@ -163,6 +172,7 @@ def _get_filter(self, order=4, cutoff=10, fs=60): def _filter_data(self, current_wrench_data: WrenchStamped) -> WrenchStamped: filtered_data = WrenchStamped() + filtered_data.header = current_wrench_data.header for attr in ['x', 'y', 'z']: force_values = [getattr(val.wrench.force, attr) for val in self.prev_values] + [ getattr(current_wrench_data.wrench.force, attr)] @@ -210,19 +220,34 @@ def get_derivative(self, is_filtered=True) -> WrenchStamped: """ Calculate the derivative of current data. - :param is_filtered: Decides about using filtered or raw data + :param is_filtered: Decides about using filtered or raw data. + :return: The derivative as a WrenchStamped object. Returns a zeroed derivative if only one data point exists. """ status = self.filtered if is_filtered else self.unfiltered + if len(self.whole_data[status]) < 2: + return WrenchStamped() + before: WrenchStamped = self.whole_data[status][-2] after: WrenchStamped = self.whole_data[status][-1] derivative = WrenchStamped() - derivative.wrench.force.x = before.wrench.force.x - after.wrench.force.x - derivative.wrench.force.y = before.wrench.force.y - after.wrench.force.y - derivative.wrench.force.z = before.wrench.force.z - after.wrench.force.z - derivative.wrench.torque.x = before.wrench.torque.x - after.wrench.torque.x - derivative.wrench.torque.y = before.wrench.torque.y - after.wrench.torque.y - derivative.wrench.torque.z = before.wrench.torque.z - after.wrench.torque.z + dt = (after.header.stamp - before.header.stamp).to_sec() + if dt == 0: + return WrenchStamped() + + derivative.wrench.force.x = (after.wrench.force.x - before.wrench.force.x) / dt + derivative.wrench.force.y = (after.wrench.force.y - before.wrench.force.y) / dt + derivative.wrench.force.z = (after.wrench.force.z - before.wrench.force.z) / dt + derivative.wrench.torque.x = (after.wrench.torque.x - before.wrench.torque.x) / dt + derivative.wrench.torque.y = (after.wrench.torque.y - before.wrench.torque.y) / dt + derivative.wrench.torque.z = (after.wrench.torque.z - before.wrench.torque.z) / dt return derivative + + def human_touch_monitoring(self): + rospy.loginfo_once("Now monitoring for human touch") + if self.robot_name == 'pr2': + der = self.get_derivative() + if abs(der.wrench.torque.x) > 3: + return SensorMonitoringCondition