Skip to content

Commit

Permalink
joy_teleop: do not require deadman input for topic mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
russkel committed Jul 18, 2021
1 parent 1c12bb5 commit 63602b9
Showing 1 changed file with 41 additions and 9 deletions.
50 changes: 41 additions & 9 deletions joy_teleop/joy_teleop/joy_teleop.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def __init__(self, name: str, buttons: typing.List[str], axes: typing.List[str])
# is a command-specific behavior, the base class only provides the mechanism.
self.active = False

# a blank Joy message for initial iteration
self.last_joy_state = sensor_msgs.msg.Joy()

def update_active_from_buttons_and_axes(self, joy_state: sensor_msgs.msg.Joy) \
-> typing.Tuple[typing.Set, typing.Set]:

Expand All @@ -128,7 +131,6 @@ class JoyTeleopTopicCommand(JoyTeleopCommand):
def __init__(self, name: str, config: typing.Dict[str, typing.Any], node: Node) -> None:
self.deadman_buttons = config.get('deadman_buttons', [])
self.deadman_axes = config.get('deadman_axes', [])
super().__init__(name, self.deadman_buttons, self.deadman_axes)

self.topic_type = get_interface_type(config['interface_type'], 'msg')

Expand All @@ -149,31 +151,59 @@ def __init__(self, name: str, config: typing.Dict[str, typing.Any], node: Node)
# publish if an activation happens. This is typically used to take joystick analog data
# and republish it as a cmd_vel. It is mutually exclusive with a 'message_value'.
self.axis_mappings = config.get('axis_mappings', {})
self.mapped_axes = []
self.mapped_buttons = []

if self.axis_mappings:
# Now check that the mappings have all of the required configuration.
for mapping, values in self.axis_mappings.items():
if 'axis' not in values and 'button' not in values and 'value' not in values:
raise JoyTeleopException("Axis mapping for '{}' must have an axis, button, "
'or value'.format(name))

if 'axis' in values:
self.mapped_axes.append(values['axis'])
if 'button' in values:
self.mapped_buttons.append(values['button'])

if self.msg_value is None and not self.axis_mappings:
raise JoyTeleopException("No 'message_value' or 'axis_mappings' "
"configured for command '{}'".format(name))
if self.msg_value is not None and self.axis_mappings:
raise JoyTeleopException("Only one of 'message_value' or 'axis_mappings' "
"can be configured for command '{}'".format(name))

super().__init__(name, self.deadman_buttons + self.mapped_buttons,
self.deadman_axes + self.mapped_axes)

qos = rclpy.qos.QoSProfile(history=rclpy.qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
durability=rclpy.qos.QoSDurabilityPolicy.VOLATILE)

self.pub = node.create_publisher(self.topic_type, config['topic_name'], qos)

def update_active_from_changed_state(self, joy_state: sensor_msgs.msg.Joy) \
-> typing.Tuple[typing.Set, typing.Set]:

active_buttons = set([but for but in self.mapped_buttons
if get_arr(joy_state.buttons, but, 0) !=
get_arr(self.last_joy_state.buttons, but, 0)])

active_axes = set([axis for axis in self.mapped_axes
if get_arr(joy_state.axes, axis, 0.0) !=
get_arr(self.last_joy_state.axes, axis, 0.0)])

self.active = len(active_buttons) != 0 or len(active_axes) != 0

self.last_joy_state = joy_state
return active_buttons, active_axes

def run(self, node: Node, joy_state: sensor_msgs.msg.Joy) -> None:
# The logic for responding to this joystick press is:
# 1. Save off the current state of active.
# 2. Update the current state of active based on buttons and axes.
# 2. Update the current state of active based on deadman buttons and axes, if set.
# Else set active based on changed button presses since last iteration.
# 3. If this command is currently not active, return without publishing.
# 4. If this is a msg_value, and the value of the previous active is the same as now,
# debounce and return without publishing.
Expand All @@ -182,21 +212,23 @@ def run(self, node: Node, joy_state: sensor_msgs.msg.Joy) -> None:
# continue to be published without debouncing.

last_active = self.active
active_buttons, active_axes = self.update_active_from_buttons_and_axes(joy_state)
if not self.active:
return
if self.msg_value is not None and last_active == self.active:
return
if any((self.deadman_buttons, self.deadman_axes)):
active_buttons, active_axes = self.update_active_from_buttons_and_axes(joy_state)

if self.deadman_buttons:
# if all the deadman buttons are not active then return
if (set(self.deadman_buttons) - active_buttons) != set():
return

if self.deadman_axes:
# if all the deadman axes are not active then return
if (set(self.deadman_axes) - active_axes) != set():
return
else:
active_buttons, active_axes = self.update_active_from_changed_state(joy_state)

if not self.active:
return
if self.msg_value is not None and last_active == self.active:
return

if self.msg_value is not None:
# This is the case for a static message.
Expand Down

0 comments on commit 63602b9

Please sign in to comment.