Skip to content

Commit

Permalink
Add schema checking for all sent messages (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
leandropineda authored Jun 30, 2021
1 parent 79991fd commit 074c624
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 34 deletions.
38 changes: 17 additions & 21 deletions massrobotics_amr_sender_py/massrobotics_amr_sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# POSSIBILITY OF SUCH DAMAGE.


from jsonschema import exceptions as jsonschema_exc
import websockets
import json
import asyncio
Expand Down Expand Up @@ -143,14 +144,12 @@ def _process_config(self):
TODO: re-run on configuration file changes
TODO: deregister callbacks on configuration changes
"""
# Update status report with local parameters
for param_name in self._config.parameters_by_source[CFG_PARAMETER_LOCAL]:
param_value = self._config.get_parameter_value(param_name)
self.mass_identity_report.update_parameter(name=param_name, value=param_value)

for param_name in self._config.parameters_by_source[CFG_PARAMETER_ENVVAR]:
param_value = self._config.get_parameter_value(param_name)
self.mass_identity_report.update_parameter(name=param_name, value=param_value)
# Update identity report values
for param_name in self.mass_identity_report.schema_properties:
if param_name in self._config.parameters_by_source[CFG_PARAMETER_ENVVAR] or \
param_name in self._config.parameters_by_source[CFG_PARAMETER_LOCAL]:
param_value = self._config.get_parameter_value(param_name)
self.mass_identity_report.update_parameter(name=param_name, value=param_value)

# Register callbacks for rosTopic parameters
for param_name in self._config.parameters_by_source[CFG_PARAMETER_ROS_TOPIC]:
Expand All @@ -175,6 +174,14 @@ async def _async_send_report(self, mass_object):
mass_object (:obj:`MassObject`): Identity or Status report
"""
self.logger.debug(f"Validating schema MassRobotics object schema")
try:
mass_object.validate_schema()
except jsonschema_exc.ValidationError as ex:
self.logger.error(f"Invalid schema for '{type(mass_object)}' message. "
f"The error reported is: '{ex.message}'. Ignoring message.")
return

self.logger.debug(f"Sending object ({type(mass_object)}): {mass_object.data}")
try:
await self._wss_conn.ensure_open()
Expand Down Expand Up @@ -206,7 +213,6 @@ def _get_frame_id_from_header(self, msg):
return frame_id

def _callback_pose_stamped_msg(self, param_name, msg_field, data):

self.logger.debug(f"Processing '{type(data)}' message: {data}")
if msg_field:
self.logger.warning(f"Parameter {param_name} doesn't support `msgField`. Ignoring.")
Expand All @@ -229,7 +235,6 @@ def _callback_pose_stamped_msg(self, param_name, msg_field, data):
}

def _callback_battery_state_msg(self, param_name, msg_field, data):

self.logger.debug(f"Processing '{type(data)}' message: {data}")
try:
self.mass_status_report.data[param_name] = getattr(data, msg_field)
Expand All @@ -238,7 +243,6 @@ def _callback_battery_state_msg(self, param_name, msg_field, data):
f"type '{type(data)}' doesn't exist")

def _callback_twist_stamped_msg(self, param_name, msg_field, data):

self.logger.debug(f"Processing '{type(data)}' message: {data}")
if msg_field:
self.logger.warning(f"Parameter {param_name} doesn't support `msgField`. Ignoring.")
Expand Down Expand Up @@ -268,15 +272,13 @@ def _callback_twist_stamped_msg(self, param_name, msg_field, data):
}

def _callback_string_msg(self, param_name, msg_field, data):

self.logger.debug(f"Processing '{type(data)}' message: {data}")
if msg_field:
self.logger.warning(f"Parameter {param_name} doesn't support `msgField`. Ignoring.")

self.mass_status_report.data[param_name] = data.data

def _callback_path_msg(self, param_name, msg_field, data):

self.logger.debug(f"Processing '{type(data)}' message: {data}")
if msg_field:
self.logger.warning(f"Parameter {param_name} doesn't support `msgField`. Ignoring.")
Expand Down Expand Up @@ -309,7 +311,6 @@ def _callback_path_msg(self, param_name, msg_field, data):
self.mass_status_report.data[param_name] = mass_predicted_locations

def _callback_error_codes_msg(self, param_name, msg_field, data):

self.logger.debug(f"Processing '{type(data)}' message: {data}")
if msg_field:
self.logger.warning(f"Parameter {param_name} doesn't support `msgField`. Ignoring.")
Expand Down Expand Up @@ -384,29 +385,22 @@ def register_mass_adapter(self, param_name, topic_name):
callback = None
if param_name == 'velocity':
callback = partial(self._callback_twist_stamped_msg, param_name, msg_field)
self.logger.info(f"Registered callback for parameter '{param_name}' (TwistStamped)")
if param_name == 'batteryPercentage':
callback = partial(self._callback_battery_state_msg, param_name, msg_field)
self.logger.info(f"Registered callback for parameter '{param_name}' (BatteryState)")
if param_name == 'location':
callback = partial(self._callback_pose_stamped_msg, param_name, msg_field)
self.logger.info(f"Registered callback for parameter '{param_name}' (PoseStamped)")
if param_name in ('destinations', 'path'):
callback = partial(self._callback_path_msg, param_name, msg_field)
self.logger.info(f"Registered callback for parameter '{param_name}' (Path)")
if param_name == 'errorCodes':
callback = partial(self._callback_error_codes_msg, param_name, msg_field)
self.logger.info(f"Registered callback for parameter '{param_name}' (String)")

# if param_name doesn't have any specific callback, fallback to string
if not callback and topic_type_t is ros_std_msgs.String:
callback = partial(self._callback_string_msg, param_name, msg_field)
self.logger.info(f"Registered callback for parameter '{param_name}' (String)")

# TODO: add all remaining 'scalar' types
if not callback and topic_type_t in (ros_std_msgs.Float32, ros_std_msgs.Float64):
callback = partial(self._callback_string_msg, param_name, msg_field)
self.logger.info(f"Registered callback for parameter '{param_name}' (Number)")

if not callback:
self.logger.error(f"Callback for parameter '{param_name}' "
Expand All @@ -419,4 +413,6 @@ def register_mass_adapter(self, param_name, topic_name):
callback=callback,
qos_profile=10)

self.logger.info(f"Registered callback for parameter '{param_name}' ({topic_type_name})")

return True
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(self, **kwargs) -> None:

self.data = {MASS_REPORT_UUID: kwargs[MASS_REPORT_UUID]}
self._update_timestamp()
self.schema = self._load_schema()

def _update_timestamp(self):
# As per Mass example, data format is ISO8601
Expand Down Expand Up @@ -91,15 +92,25 @@ def update_parameter(self, name, value):
self.data[name] = value
self._update_timestamp()

def _validate_schema(self):
def _load_schema(self):
cwd = Path(__file__).resolve().parent
jsonschema_def_path = str(cwd / 'schema.json')
with open(jsonschema_def_path, 'r') as fp:
schema = json.load(fp)

# Fail if additional properties are found on the objects.
# This can be consider as a workaround, the ``additionalProperties``
# keyword should be set in the original schema.
for obj_name in ('identityReport', 'statusReport'):
if 'additionalProperties' not in schema[obj_name]:
schema[obj_name]['additionalProperties'] = False

return schema

def validate_schema(self):
# TODO: capture relevant exceptions and improve
# error reporting
jsonschema.validate(instance=self.data, schema=schema)
jsonschema.validate(instance=self.data, schema=self.schema)


class IdentityReport(MassObject):
Expand Down Expand Up @@ -145,7 +156,7 @@ def __init__(self, **kwargs) -> None:
for property_name, property_value in kwargs.items():
self.data[property_name] = property_value

self._validate_schema()
self.validate_schema()


class StatusReport(MassObject):
Expand Down Expand Up @@ -195,4 +206,4 @@ def __init__(self, **kwargs) -> None:
for property_name, property_value in kwargs.items():
self.data[property_name] = property_value

self._validate_schema()
self.validate_schema()
38 changes: 29 additions & 9 deletions massrobotics_amr_sender_py/test/test_mass_interop.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ def test_massrobotics_amr_node_init():
{
'msg_type': ros_std_msgs.String,
'topic': '/we_b_robots/mode',
'msg': ros_std_msgs.String(data='foo'),
'msg': ros_std_msgs.String(data='charging'),
'property': 'operationalState',
'value': 'foo'
'value': 'charging'
},
{
'msg_type': ros_geometry_msgs.PoseStamped,
Expand Down Expand Up @@ -360,13 +360,6 @@ def test_massrobotics_amr_node_init():
'msg': ros_std_msgs.String(),
'property': 'errorCodes',
'value': []
},
{
'msg_type': ros_std_msgs.String,
'topic': '/troubleshooting/errorcodes',
'msg': ros_std_msgs.String(data='error1,error2,error1'),
'property': 'errorCodes',
'value': ['error1', 'error2', 'error1'] # FIXME: duplicate are no allowed
}
]

Expand Down Expand Up @@ -410,3 +403,30 @@ def test_massrobotics_amr_node_status_report_callbacks(event_loop):
# Mass identity report is sent once on Node init
# and after processing all messages on ``STATUS_REPORT_TESTS``
assert node._wss_conn.send.call_count == 2


def test_massrobotics_amr_node_status_report_not_sent_on_invalid_schema(event_loop):
rclpy.init()
# create the node we want to test
node = MassRoboticsAMRInteropNode(parameter_overrides=[
Parameter("config_file", value=str(config_file_test))
])

node.mass_status_report.data['operationalState'] = 'foobar'

rclpy.spin_once(node, timeout_sec=.1)

# Try to send a status report with an invalid schema i.e. ``foobar`` operational
# state is not an allowed value.
event_loop.run_until_complete(node._async_send_report(node.mass_status_report))

rclpy.shutdown()

# assert connect method has been called once
assert websockets.connect.call_count == 1
# assert mocked status published thread has been called once
assert node._status_publisher_thread.call_count == 1
# Mass identity report is sent once on Node init
# That should be the only successful call given that the status
# report sent above is invalid and the node should not send it.
assert node._wss_conn.send.call_count == 1

0 comments on commit 074c624

Please sign in to comment.