Skip to content

Commit

Permalink
Avoid rolling up an ERROR state when empty GenericAnalyzer blocks are…
Browse files Browse the repository at this point in the history
… marked discard_stale, or when all of their items are STALE. (ros#315)

* If discard_stale = true, don't interpret stale as ERROR when rolling up
* Check in test to exercise behavior

---------

Signed-off-by: Andrew Symington <[email protected]>
Signed-off-by: Christian Henkel <[email protected]>
Co-authored-by: Christian Henkel <[email protected]>
  • Loading branch information
asymingt and ct2034 authored Jan 25, 2024
1 parent a09c0b1 commit 1401f48
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 6 deletions.
5 changes: 5 additions & 0 deletions diagnostic_aggregator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ if(BUILD_TESTING)
test/test_critical_pub.py
TIMEOUT 30
)

ament_add_pytest_test(test_discard_behavior
"${CMAKE_CURRENT_SOURCE_DIR}/test/test_discard_behavior.py"
TIMEOUT 60
)
endif()

install(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class GenericAnalyzerBase : public Analyzer

auto header_status = std::make_shared<diagnostic_msgs::msg::DiagnosticStatus>();
header_status->name = path_;
header_status->level = 0;
header_status->level = diagnostic_msgs::msg::DiagnosticStatus::OK;
header_status->message = "OK";

std::vector<std::shared_ptr<diagnostic_msgs::msg::DiagnosticStatus>> processed;
Expand Down Expand Up @@ -224,22 +224,28 @@ class GenericAnalyzerBase : public Analyzer

// Header is not stale unless all subs are
if (all_stale) {
header_status->level = diagnostic_msgs::msg::DiagnosticStatus::STALE;
// If we elect to discard stale items, then it signals that the absence of an item
// is not considered problematic, so we should allow empty queues to roll up as OK.
if (discard_stale_) {
header_status->level = diagnostic_msgs::msg::DiagnosticStatus::OK;
} else {
header_status->level = diagnostic_msgs::msg::DiagnosticStatus::STALE;
}
} else if (header_status->level == diagnostic_msgs::msg::DiagnosticStatus::STALE) {
header_status->level = 2;
header_status->level = diagnostic_msgs::msg::DiagnosticStatus::ERROR;
}

header_status->message = valToMsg(header_status->level);

// If we expect a given number of items, check that we have this number
if (num_items_expected_ == 0 && items_.empty()) {
header_status->level = 0;
header_status->level = diagnostic_msgs::msg::DiagnosticStatus::OK;
header_status->message = "OK";
} else if ( // NOLINT
num_items_expected_ > 0 &&
static_cast<int8_t>(items_.size()) != num_items_expected_)
{ // NOLINT
int8_t lvl = 2;
int8_t lvl = diagnostic_msgs::msg::DiagnosticStatus::ERROR;
header_status->level = std::max(lvl, static_cast<int8_t>(header_status->level));

std::stringstream expec, item;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class OtherAnalyzer : public GenericAnalyzerBase
processed.begin();
for (; it != processed.end(); ++it) {
if ((*it)->name == path_) {
(*it)->level = 2;
(*it)->level = diagnostic_msgs::msg::DiagnosticStatus::ERROR;
(*it)->message = "Unanalyzed items found in \"Other\"";
break;
}
Expand Down
1 change: 1 addition & 0 deletions diagnostic_aggregator/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<test_depend>ament_cmake_pytest</test_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>launch_pytest</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>launch_testing_ros</test_depend>

Expand Down
299 changes: 299 additions & 0 deletions diagnostic_aggregator/test/test_discard_behavior.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
# Copyright 2023 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# DESCRIPTION
# This test ensures that a parent AnalyzerGroup does not roll up an ERROR state when a
# GenericAnalyzer child block is marked with discard_stale: true and either of these
# conditions are met:
#
# 1. There are no statuses that match any of the GenericAnalyzer child blocks.
# 2. Every matching status in the GenericAnalyzer child block has been marked stale
#
# In this example, if foo and bar have no matching statuses or all of their statuses
# are STALE, they will roll up as OK because the discard_stale: true flag implies that
# stale statuses are disposable.
#
# analyzer:
# ros__parameters:
# path: 'agg'
# pub_rate: 1.0
# analyzers:
# part:
# type: 'diagnostic_aggregator/AnalyzerGroup'
# path: 'part'
# foo:
# type: 'diagnostic_aggregator/GenericAnalyzer'
# path: 'foo'
# find_and_remove_prefix: ['/foo:']
# num_items: 1
# bar:
# type: 'diagnostic_aggregator/GenericAnalyzer'
# path: 'bar'
# find_and_remove_prefix: ['/bar:']
# discard_stale: true

# Python includes.
from collections import namedtuple
import tempfile

# ROS2 includes.
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
import launch
import launch_pytest
import launch_ros
import pytest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.task import Future


# All tests take a common structure.
TestMetadata = namedtuple(
'TestMetadata',
['foo_discard', 'foo_status', 'bar_discard', 'bar_status', 'agg_expected'],
)

# A status value of 'None' means that the state is never sent (it's missing).
TEST_METADATA = [
# CASE 1: both 'foo' and 'bar' are marked discard_stale := true
TestMetadata(
foo_discard=True,
foo_status=None,
bar_discard=True,
bar_status=None,
agg_expected=DiagnosticStatus.OK,
),
TestMetadata(
foo_discard=True,
foo_status=DiagnosticStatus.STALE,
bar_discard=True,
bar_status=DiagnosticStatus.STALE,
agg_expected=DiagnosticStatus.OK,
),
TestMetadata(
foo_discard=True,
foo_status=None,
bar_discard=True,
bar_status=DiagnosticStatus.STALE,
agg_expected=DiagnosticStatus.OK,
),
# CASE 2: both 'foo' and 'bar' are marked discard_stale := false
TestMetadata(
foo_discard=False,
foo_status=None,
bar_discard=False,
bar_status=None,
agg_expected=DiagnosticStatus.STALE,
),
TestMetadata(
foo_discard=False,
foo_status=DiagnosticStatus.STALE,
bar_discard=False,
bar_status=DiagnosticStatus.STALE,
agg_expected=DiagnosticStatus.STALE,
),
TestMetadata(
foo_discard=False,
foo_status=None,
bar_discard=False,
bar_status=DiagnosticStatus.STALE,
agg_expected=DiagnosticStatus.STALE,
),
# CASE 3: one of 'foo' or 'bar' are marked discard_stale := true
TestMetadata(
foo_discard=True,
foo_status=None,
bar_discard=False,
bar_status=None,
agg_expected=DiagnosticStatus.ERROR, # <-- This is the case we are testing for.
# if one of the children is *not* marked discard_stale := true and
# there are no statuses, then the parent should roll up to ERROR.
),
TestMetadata(
foo_discard=True,
foo_status=DiagnosticStatus.OK,
bar_discard=False,
bar_status=None,
agg_expected=DiagnosticStatus.ERROR,
),
TestMetadata(
foo_discard=True,
foo_status=None,
bar_discard=False,
bar_status=DiagnosticStatus.OK,
agg_expected=DiagnosticStatus.OK, # <-- This is the case we are testing for.
# but if a child is marked discard_stale := true and there are no statuses,
# the parent should roll up to OK.
),
TestMetadata(
foo_discard=True,
foo_status=DiagnosticStatus.OK,
bar_discard=False,
bar_status=DiagnosticStatus.OK,
agg_expected=DiagnosticStatus.OK,
),
]


class DiagnosticsTestNode(Node):
"""Class that publishes raw diagnostics and listens for aggregated diagnostics."""

def __init__(self, foo_status, bar_status, agg_expected):
super().__init__(node_name='diagnostics_listener_node')
self.foo_status = foo_status
self.bar_status = bar_status
self.agg_expected = agg_expected
self.agg_received = None
self.counter = 0
self.future = Future()
self.subscriber = self.create_subscription(
msg_type=DiagnosticArray,
topic='/diagnostics_agg',
callback=self.diagnostics_aggregated_callback,
qos_profile=10,
)
self.publisher = self.create_publisher(
msg_type=DiagnosticArray, topic='/diagnostics', qos_profile=10
)
self.timer = self.create_timer(0.1, self.timer_callback)

def timer_callback(self):
"""Call from a timer to send off raw diagnostics."""
msg = DiagnosticArray()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'robot'
if self.foo_status is not None:
msg.status.append(
DiagnosticStatus(
name='/foo', level=self.foo_status, message='Foo', hardware_id='foo'
)
)
if self.bar_status is not None:
msg.status.append(
DiagnosticStatus(
name='/bar', level=self.bar_status, message='Bar', hardware_id='bar'
)
)
self.publisher.publish(msg)

def diagnostics_aggregated_callback(self, msg):
"""Call from a subscriber providing aggregated diagnostics."""
for status in msg.status:
if status.name == '/robot/agg':
self.agg_received = status.level
self.counter += 1
if self.agg_expected == status.level:
# Diagnostics may take a few iterations to 'settle' into the right
# state because of how the STALE logic is applied. So, we keep checking
# the aggregator result until it reaches the value we are expecting,
# and then trigger the future.
self.future.set_result(self.counter)


@pytest.fixture(scope='function')
def test_metadata(request):
"""Enable parameter indirection, so we can pass a parameterization into fixtures."""
return request.param


@pytest.fixture(scope='function')
def yaml_file(test_metadata):
"""Generate a YAML file to test a specific configuration state."""
with tempfile.NamedTemporaryFile(delete=False) as fp:
fp.write(
bytes(
f"""
diagnostic_aggregator:
ros__parameters:
path: 'robot'
pub_rate: 1.0
analyzers:
part:
type: 'diagnostic_aggregator/AnalyzerGroup'
path: 'agg'
timeout: 2.0
foo:
type: 'diagnostic_aggregator/GenericAnalyzer'
path: 'foo'
find_and_remove_prefix: ['/foo']
discard_stale: {test_metadata.foo_discard}
bar:
type: 'diagnostic_aggregator/GenericAnalyzer'
path: 'bar'
find_and_remove_prefix: ['/bar']
discard_stale: {test_metadata.bar_discard}
""",
'utf-8',
)
)
return fp.name


@pytest.fixture(scope='function')
def diagnostic_aggregator_node():
"""Declare an aggregator that uses a global configuration set by the launch."""
return launch_ros.actions.Node(
name='diagnostic_aggregator',
package='diagnostic_aggregator',
executable='aggregator_node',
)


@launch_pytest.fixture(scope='function')
def launch_description(yaml_file, diagnostic_aggregator_node):
"""Declare what should be launched in each test."""
return launch.LaunchDescription(
[
launch_ros.actions.SetParametersFromFile(yaml_file),
diagnostic_aggregator_node,
launch_pytest.actions.ReadyToTest(),
]
)


@pytest.mark.parametrize('test_metadata', TEST_METADATA, indirect=True)
@pytest.mark.launch(fixture=launch_description)
def test_discard_behavior(test_metadata, launch_context):
"""Run a launch test for each test in our set of tests."""
rclpy.init()

node = DiagnosticsTestNode(
foo_status=test_metadata.foo_status,
bar_status=test_metadata.bar_status,
agg_expected=test_metadata.agg_expected,
)

executor = SingleThreadedExecutor()
executor.add_node(node)

try:
executor.spin_until_future_complete(future=node.future, timeout_sec=10.0)
print(
f"""
The test produced the following result:
+ foo_level: {test_metadata.foo_status} (discard: {test_metadata.foo_discard})
+ bar_level: {test_metadata.bar_status} (discard: {test_metadata.bar_discard})
Expected level: {test_metadata.agg_expected}
Received level: {node.agg_received}
"""
)
assert node.future.done(), 'Launch timed out without producing aggregation'
assert (
node.agg_received == test_metadata.agg_expected
), 'Unexpected parent status level'
print(f'It took {node.future.result()} aggregations to find the correct status')

finally:
rclpy.try_shutdown()

0 comments on commit 1401f48

Please sign in to comment.