Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

joy_teleop: Do not require deadman input for axis mapping #71

Open
wants to merge 3 commits into
base: foxy-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 74 additions & 47 deletions joy_teleop/joy_teleop/joy_teleop.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,19 @@ def set_member(msg: typing.Any, member: str, value: typing.Any) -> None:
setattr(target, ml[-1], value)


def get_arr(arr, idx, default):
if idx < len(arr):
return arr[idx]
else:
return default


class JoyTeleopCommand:

def __init__(self, name: str, config: typing.Dict[str, typing.Any],
button_name: str, axes_name: str) -> None:
self.buttons: typing.List[str] = []
if button_name in config:
self.buttons = config[button_name]
self.axes: typing.List[str] = []
if axes_name in config:
self.axes = config[axes_name]
def __init__(self, name: str, buttons: typing.List[str], axes: typing.List[str]) -> None:
self.name = name
self.buttons: typing.List[str] = buttons
self.axes: typing.List[str] = axes

if len(self.buttons) == 0 and len(self.axes) == 0:
raise JoyTeleopException("No buttons or axes configured for command '{}'".format(name))
Expand All @@ -90,6 +93,7 @@ def __init__(self, name: str, config: typing.Dict[str, typing.Any],
self.min_button = 0
if len(self.buttons) > 0:
self.min_button = int(min(self.buttons))

self.min_axis = 0
if len(self.axes) > 0:
self.min_axis = int(min(self.axes))
Expand All @@ -99,37 +103,34 @@ def __init__(self, name: str, config: typing.Dict[str, typing.Any],
# is a command-specific behavior, the base class only provides the mechanism.
self.active = False

def update_active_from_buttons_and_axes(self, joy_state: sensor_msgs.msg.Joy) -> None:
self.active = False
# a blank Joy message for initial iteration
self.last_joy_state = sensor_msgs.msg.Joy()

def update_active_from_buttons_and_axes(self, joy_state: sensor_msgs.msg.Joy) \
-> typing.Tuple[typing.Set, typing.Set]:

if (self.min_button is not None and len(joy_state.buttons) <= self.min_button) and \
(self.min_axis is not None and len(joy_state.axes) <= self.min_axis):
if len(joy_state.buttons) <= self.min_button and len(joy_state.axes) <= self.min_axis:
# Not enough buttons or axes, so it can't possibly be a message for this command.
return

for button in self.buttons:
try:
self.active |= joy_state.buttons[button] == 1
except IndexError:
# An index error can occur if this command is configured for multiple buttons
# like (0, 10), but the length of the joystick buttons is only 1. Ignore these.
pass
active_buttons = set([but for but in self.buttons
if get_arr(joy_state.buttons, but, 0) == 1])

for axis in self.axes:
try:
self.active |= joy_state.axes[axis] == 1.0
except IndexError:
# An index error can occur if this command is configured for multiple buttons
# like (0, 10), but the length of the joystick buttons is only 1. Ignore these.
pass
# Depending on the joystick, this detection might not work. Some axes on gamepads do not
# have 1.0 for the maximum value, i.e. xbox wireless shoulder triggers are 1.0 neutral
active_axes = set([axis for axis in self.axes
if get_arr(joy_state.axes, axis, 0.0) == 1.0])

self.active = len(active_buttons) != 0 or len(active_axes) != 0

return active_buttons, active_axes


class JoyTeleopTopicCommand(JoyTeleopCommand):

def __init__(self, name: str, config: typing.Dict[str, typing.Any], node: Node) -> None:
super().__init__(name, config, 'deadman_buttons', 'deadman_axes')

self.name = name
self.deadman_buttons = config.get('deadman_buttons', [])
self.deadman_axes = config.get('deadman_axes', [])

self.topic_type = get_interface_type(config['interface_type'], 'msg')

Expand All @@ -149,23 +150,21 @@ def __init__(self, name: str, config: typing.Dict[str, typing.Any], node: Node)
# An 'axis_mapping' takes data from one part of the message and scales and offsets it to
# publish if an activation happens. This is typically used to take joystick analog data
# and republish it as a cmd_vel. It is mutually exclusive with a 'message_value'.
self.axis_mappings = {}
if 'axis_mappings' in config:
self.axis_mappings = config['axis_mappings']
self.axis_mappings = config.get('axis_mappings', {})
self.mapped_axes = []
self.mapped_buttons = []

if self.axis_mappings:
# Now check that the mappings have all of the required configuration.
for mapping, values in self.axis_mappings.items():
if 'axis' not in values and 'button' not in values and 'value' not in values:
raise JoyTeleopException("Axis mapping for '{}' must have an axis, button, "
'or value'.format(name))

if 'axis' in values:
if 'offset' not in values:
raise JoyTeleopException("Axis mapping for '{}' must have an offset"
.format(name))

if 'scale' not in values:
raise JoyTeleopException("Axis mapping for '{}' must have a scale"
.format(name))
self.mapped_axes.append(values['axis'])
if 'button' in values:
self.mapped_buttons.append(values['button'])

if self.msg_value is None and not self.axis_mappings:
raise JoyTeleopException("No 'message_value' or 'axis_mappings' "
Expand All @@ -174,17 +173,37 @@ def __init__(self, name: str, config: typing.Dict[str, typing.Any], node: Node)
raise JoyTeleopException("Only one of 'message_value' or 'axis_mappings' "
"can be configured for command '{}'".format(name))

super().__init__(name, self.deadman_buttons + self.mapped_buttons,
self.deadman_axes + self.mapped_axes)

qos = rclpy.qos.QoSProfile(history=rclpy.qos.QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
durability=rclpy.qos.QoSDurabilityPolicy.VOLATILE)

self.pub = node.create_publisher(self.topic_type, config['topic_name'], qos)

def update_active_from_changed_state(self, joy_state: sensor_msgs.msg.Joy) \
-> typing.Tuple[typing.Set, typing.Set]:

active_buttons = set([but for but in self.mapped_buttons
if get_arr(joy_state.buttons, but, 0) !=
get_arr(self.last_joy_state.buttons, but, 0)])

active_axes = set([axis for axis in self.mapped_axes
if get_arr(joy_state.axes, axis, 0.0) !=
get_arr(self.last_joy_state.axes, axis, 0.0)])

self.active = len(active_buttons) != 0 or len(active_axes) != 0

self.last_joy_state = joy_state
return active_buttons, active_axes

def run(self, node: Node, joy_state: sensor_msgs.msg.Joy) -> None:
# The logic for responding to this joystick press is:
# 1. Save off the current state of active.
# 2. Update the current state of active based on buttons and axes.
# 2. Update the current state of active based on deadman buttons and axes, if set.
# Else set active based on changed button presses since last iteration.
# 3. If this command is currently not active, return without publishing.
# 4. If this is a msg_value, and the value of the previous active is the same as now,
# debounce and return without publishing.
Expand All @@ -193,7 +212,19 @@ def run(self, node: Node, joy_state: sensor_msgs.msg.Joy) -> None:
# continue to be published without debouncing.

last_active = self.active
self.update_active_from_buttons_and_axes(joy_state)
if any((self.deadman_buttons, self.deadman_axes)):
active_buttons, active_axes = self.update_active_from_buttons_and_axes(joy_state)

# if all the deadman buttons are not active then return
if (set(self.deadman_buttons) - active_buttons) != set():
return

# if all the deadman axes are not active then return
if (set(self.deadman_axes) - active_axes) != set():
return
else:
active_buttons, active_axes = self.update_active_from_changed_state(joy_state)

if not self.active:
return
if self.msg_value is not None and last_active == self.active:
Expand Down Expand Up @@ -245,9 +276,7 @@ def run(self, node: Node, joy_state: sensor_msgs.msg.Joy) -> None:
class JoyTeleopServiceCommand(JoyTeleopCommand):

def __init__(self, name: str, config: typing.Dict[str, typing.Any], node: Node) -> None:
super().__init__(name, config, 'buttons', 'axes')

self.name = name
super().__init__(name, config.get('buttons', []), config.get('axes', []))

service_name = config['service_name']

Expand Down Expand Up @@ -294,9 +323,7 @@ def run(self, node: Node, joy_state: sensor_msgs.msg.Joy) -> None:
class JoyTeleopActionCommand(JoyTeleopCommand):

def __init__(self, name: str, config: typing.Dict[str, typing.Any], node: Node) -> None:
super().__init__(name, config, 'buttons', 'axes')

self.name = name
super().__init__(name, config.get('buttons', []), config.get('axes', []))

action_type = get_interface_type(config['interface_type'], 'action')

Expand Down
30 changes: 0 additions & 30 deletions joy_teleop/test/test_parameter_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,6 @@ def test_teleop_axis_mappings_missing_axis_or_button(self):
self.assertEqual(joy_teleop_process.exit_code, 1)
self.assertTrue('must have an axis, button, or value' in joy_teleop_process.output)

def test_teleop_axis_mappings_missing_offset(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these tests still hold value in showing that things are handled despite some things missing. Is it ok to leave them?

Copy link
Contributor Author

@russkel russkel Sep 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are expecting an error when there isn't one after the changes in this PR.

Do you mean you also want the checks for empty offset and scale put back into the code?

parameters = {}
parameters['simple_message.type'] = 'topic'
parameters['simple_message.interface_type'] = 'std_msgs/msg/String'
parameters['simple_message.topic_name'] = '/simple_message_type'
parameters['simple_message.deadman_buttons'] = [2]
parameters['simple_message.axis_mappings.linear-x.axis'] = 1
parameters['simple_message.axis_mappings.linear-x.scale'] = 0.8

with self.launch_joy_teleop(parameters) as joy_teleop_process:
self.assertTrue(joy_teleop_process.wait_for_shutdown(timeout=10))

self.assertEqual(joy_teleop_process.exit_code, 1)
self.assertTrue('must have an offset' in joy_teleop_process.output)

def test_teleop_axis_mappings_missing_scale(self):
parameters = {}
parameters['simple_message.type'] = 'topic'
parameters['simple_message.interface_type'] = 'std_msgs/msg/String'
parameters['simple_message.topic_name'] = '/simple_message_type'
parameters['simple_message.deadman_buttons'] = [2]
parameters['simple_message.axis_mappings.linear-x.axis'] = 1
parameters['simple_message.axis_mappings.linear-x.offset'] = 0.0

with self.launch_joy_teleop(parameters) as joy_teleop_process:
self.assertTrue(joy_teleop_process.wait_for_shutdown(timeout=10))

self.assertEqual(joy_teleop_process.exit_code, 1)
self.assertTrue('must have a scale' in joy_teleop_process.output)

def test_teleop_invalid_message_fields(self):
parameters = {}
parameters['simple_message.type'] = 'topic'
Expand Down