From ba4a776f756537e174559a8a5db28a2260c6f866 Mon Sep 17 00:00:00 2001 From: Sascha Arnold Date: Wed, 16 Oct 2024 09:45:04 +0200 Subject: [PATCH] Adds an input aligner filter (#148) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Alejandro Hernández Cordero Co-authored-by: Alejandro Hernández Cordero --- CMakeLists.txt | 5 + doc/index.rst | 31 ++ include/message_filters/input_aligner.hpp | 436 ++++++++++++++++++++++ test/test_input_aligner.cpp | 408 ++++++++++++++++++++ 4 files changed, 880 insertions(+) create mode 100644 include/message_filters/input_aligner.hpp create mode 100644 test/test_input_aligner.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 96f9ff7..d8d0f9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,6 +132,11 @@ if(BUILD_TESTING) target_link_libraries(${PROJECT_NAME}-test_message_traits ${PROJECT_NAME} rclcpp::rclcpp ${std_msgs_TARGETS}) endif() + ament_add_gtest(${PROJECT_NAME}-test_input_aligner test/test_input_aligner.cpp) + if(TARGET ${PROJECT_NAME}-test_input_aligner) + target_link_libraries(${PROJECT_NAME}-test_input_aligner ${PROJECT_NAME}) + endif() + # python tests with python interfaces of message filters find_package(ament_cmake_pytest REQUIRED) ament_add_pytest_test(directed.py "test/directed.py") diff --git a/doc/index.rst b/doc/index.rst index ce791b0..b7cc71c 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -14,6 +14,7 @@ The filters currently implemented in this package are: * :class:`message_filters.Cache` Caches messages which pass through it, allowing later lookup by time stamp. * :class:`message_filters.TimeSynchronizer` Synchronizes multiple messages by their timestamps, only passing them through when all have arrived. * :class:`message_filters.TimeSequencer` Tries to pass messages through ordered by their timestamps, even if some arrive out of order. + * :class:`message_filters.InputAligner` Synchronizes multiple messages by their timestamps, passing them through to individial callbacks in the right order. 1. Filter Pattern ----------------- @@ -222,6 +223,36 @@ It is possible to pass bare pointers in. These will not be automatically deleted std::shared_ptr > sub = c.getFilter >(sub_index); +8. Input Aligner +----------------- +* Python: the InputAligner filter is not yet implemented. + +The InputAligner filter aligns multiple inputs in time and passing them through in order. For N inputs this filter provides N outputs. Often sensors or pre-processing chains might introduce delays to messages until they arrive at a target node. The input aligner ensures that the messages are forwarded in order. + +8.1 Connections +~~~~~~~~~~~~~~~ +Input: + * C++: N separted filters, each of which is of the signature ``void callback(const std::shared_ptr&)``. The number of filters supported is determined by the number of template arguments the class was created with. +Output: + * C++: N separted filters, each of which is of the signature ``void callback(const std::shared_ptr&)``. The number of filters supported is determined by the number of template arguments the class was created with. + +8.2 Example (C++) +~~~~~~~~~~~~~~~~~ +.. code-block:: C++ + + message_filters::Subscriber sub0(node, "my_twist", 10); + message_filters::Subscriber sub1(node, "my_vector", 10); + message_filters::InputAligner aligner( + rclcpp::Duration(0.5), sub0, sub1); + aligner.registerCallback<0>(myTwistCallback); + aligner.registerCallback<1>(myVectorCallback); + // optinally give a hint about the input periods + aligner.setInputPeriod<0>(rclcpp::Duration(0.009)); + aligner.setInputPeriod<1>(rclcpp::Duration(0.045)); + // Setup dispatch timer (alternativly `aligner.dispatchMessages()` can be called as required) + aligner.setupDispatchTimer(node, rclcpp::Duration(0.01)); + + The message filter interface ---------------------------- diff --git a/include/message_filters/input_aligner.hpp b/include/message_filters/input_aligner.hpp new file mode 100644 index 0000000..9816c28 --- /dev/null +++ b/include/message_filters/input_aligner.hpp @@ -0,0 +1,436 @@ +// Copyright 2024, Kraken Robotics. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the Willow Garage nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#ifndef MESSAGE_FILTERS__INPUT_ALIGNER_HPP_ +#define MESSAGE_FILTERS__INPUT_ALIGNER_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "message_filters/connection.hpp" +#include "message_filters/message_traits.hpp" +#include "message_filters/null_types.hpp" +#include "message_filters/signal1.hpp" + +namespace message_filters +{ + +/** + * \brief Aligns multiple inputs in time and passing them through in order. + * For N inputs this filter provides N outputs. + * Often sensors or pre-processing chains might introduce delays + * to messages until they arrive at a target node. + * The input aligner ensures that the messages are forwarded in order. + * + * If the input messages are periodic a hint about their frequency can be + * set via `setInputPeriod` for each input. This allows the filter to look ahead + * for the given time and reduces delays. + * The period should be selected optimistically at the lower bound. + * + * If for an input no messages arrive the input will timeout and will be ignored until + * new messages arrive. + * + * Forwarding messages can be either triggered by calling `dispatchMessages` periodically + * or by setting up a timer with `setupDispatchTimer`. + * + * This implementation was inspired by ROCK's stream aligner. + * https://www.rock-robotics.org/documentation/data_processing/stream_aligner.html + */ +template +class InputAligner : public noncopyable +{ +public: + static constexpr std::size_t N_INPUTS = sizeof...(Ms); + typedef std::tuple Messages; + typedef std::tuple...> Events; + typedef std::tuple...> Signals; + + /** + * \brief Status information of a input queue + */ + struct QueueStatus + { + bool active; + std::size_t queue_size; + std::size_t msgs_processed; + std::size_t msgs_dropped; + }; + + template + InputAligner(const rclcpp::Duration & timeout, F0 & f0, F1 & f1, Fs &... fs) + : timeout_(timeout) + , last_in_ts_(0, 0, RCL_ROS_TIME) + , last_out_ts_(0, 0, RCL_ROS_TIME) + { + connectInput(f0, f1, fs ...); + } + + explicit InputAligner(const rclcpp::Duration & timeout) + : timeout_(timeout) + , last_in_ts_(0, 0, RCL_ROS_TIME) + , last_out_ts_(0, 0, RCL_ROS_TIME) + { + } + + virtual ~InputAligner() + { + disconnectAll(); + } + + /** + * \brief Connects |Fs| inputs. + * Disconnects existing inputs. + */ + template + void connectInput(Fs &... fs) + { + disconnectAll(); + + std::tuple tuple{fs ...}; + connectInputImpl(tuple, std::make_index_sequence{}); + } + + /** + * \brief Register a callback for the I-th input. + * \param callback The callback to call + */ + template + Connection registerCallback(const std::function & callback) + { + using Message = typename std::tuple_element_t; + using Signal = typename std::tuple_element_t; + typename CallbackHelper1::Ptr helper = std::get(signals_).addCallback(callback); + return Connection(std::bind(&Signal::removeCallback, &std::get(signals_), helper)); + } + + /** + * \brief Register a callback for the I-th input. + * \param callback The callback to call + */ + template + Connection registerCallback(void (* callback)(P)) + { + using Message = typename std::tuple_element_t; + using Signal = typename std::tuple_element_t; + typename CallbackHelper1::Ptr helper = + std::get(signals_).template addCallback

(std::bind(callback, std::placeholders::_1)); + return Connection(std::bind(&Signal::removeCallback, &std::get(signals_), helper)); + } + + /** + * \brief Register a callback for the I-th input. + * \param callback The callback to call + */ + template + Connection registerCallback(void (T::* callback)(P), T * t) + { + using Message = typename std::tuple_element_t; + using Signal = typename std::tuple_element_t; + typename CallbackHelper1::Ptr helper = + std::get(signals_).template addCallback

(std::bind(callback, t, std::placeholders::_1)); + return Connection(std::bind(&Signal::removeCallback, &std::get(signals_), helper)); + } + + /** + * \brief Set the name of this filter. For debugging use. + */ + void setName(const std::string & name) {name_ = name;} + + /** + * \brief Get the name of this filter. For debugging use. + */ + const std::string & getName() {return name_;} + + /** + * \brief Adds a message to the I-th input queue. + * With this function this filter can also be used without the use of other filter classes. + */ + template + void add(const std::shared_ptr const> & msg) + { + this->template addEvent(typename std::tuple_element_t(msg)); + } + + /** + * \brief Sets the input period for the I-th input. + * This allows the filter to look ahead for the given time and reduces delays. + * The period should be selected optimistically at the lower bound. + * The default value is 0, which means there is no look ahead for this input. + */ + template + void setInputPeriod(const rclcpp::Duration & period) + { + std::get(event_queues_).setPeriod(period); + } + + template + QueueStatus getQueueStatus() const + { + return std::get(event_queues_).getStatus(); + } + + /** + * \brief Sets up a timer calling the `dispatchMessages` function. + * \param node The ROS node. + * \param update_rate The rate in which `dispatchMessages` is triggered. + */ + template + void setupDispatchTimer(std::shared_ptr node, const rclcpp::Duration & update_rate) + { + dispatch_timer_ = node->create_wall_timer( + std::chrono::nanoseconds(update_rate.nanoseconds()), [this]() { + dispatchMessages(); + }); + } + + /** + * \brief Forwards all messages that can be aligned. + * This function should be called periodically. + */ + void dispatchMessages() + { + auto index_sequence = std::make_index_sequence{}; + + std::lock_guard lock(mutex_); + + // check if msgs are available + if (inputsAvailable(index_sequence)) { + bool input_available = true; + // iterate event queues as long as there are msgs to dispatch + while (input_available) { + input_available = dispatchFirstMessage(index_sequence); + } + } + } + +protected: + template + struct EventSort + { + bool operator()( + const MessageEvent & lhs, const MessageEvent & rhs) const + { + return message_filters::message_traits::TimeStamp::value(*lhs.getConstMessage()) < + message_filters::message_traits::TimeStamp::value(*rhs.getConstMessage()); + } + }; + + template + class EventQueue + : public std::multiset, + EventSort> + { +public: + EventQueue() + : next_ts_(rclcpp::Time::max(RCL_ROS_TIME)), period_(0, 0), + active_(false), msgs_processed_(0), msgs_dropped_(0) {} + + rclcpp::Time firstTimeStamp() + { + if (!this->empty()) { + rclcpp::Time first_ts = + message_filters::message_traits::TimeStamp::value( + *(this->begin()->getConstMessage())); + next_ts_ = first_ts + period_; + active_ = true; + return first_ts; + } else if (active_) { + return next_ts_; + } + return rclcpp::Time::max(RCL_ROS_TIME); + } + + void popFirst() + { + this->erase(this->begin()); + msgs_processed_++; + } + + void msgDropped() + { + msgs_dropped_++; + } + + void setPeriod(const rclcpp::Duration & period) + { + period_ = period; + } + + void setActive(bool active) + { + active_ = active; + } + + QueueStatus getStatus() const + { + return QueueStatus{active_, this->size(), msgs_processed_, msgs_dropped_}; + } + +protected: + rclcpp::Time next_ts_; + rclcpp::Duration period_; + bool active_; + std::size_t msgs_processed_; + std::size_t msgs_dropped_; + }; + + typedef std::tuple...> EventQueues; + + template + void connect(FTuple & ftuple) + { + using MEvent = typename std::tuple_element_t; + input_connections_[I] = + std::get(ftuple).registerCallback( + std::function( + std::bind( + &InputAligner::template addEvent, this, std::placeholders::_1))); + } + + template + void connectInputImpl(FTuple & ftuple, std::index_sequence) + { + (connect(ftuple), ...); + } + + void disconnectAll() + { + for (auto & input_connection : input_connections_) { + input_connection.disconnect(); + } + } + + template + void addEvent(const typename std::tuple_element_t & evt) + { + using Message = typename std::tuple_element_t; + + rclcpp::Time msg_timestamp = + message_filters::message_traits::TimeStamp::value(*evt.getConstMessage()); + + std::lock_guard lock(mutex_); + auto & event_queue = std::get(event_queues_); + + // evaluate age of msg + if (msg_timestamp < last_out_ts_) { + event_queue.msgDropped(); + RCUTILS_LOG_WARN("Messages of type %ld arrived too late and will be dropped", I); + return; + } + + // set latest ts + if (msg_timestamp > last_in_ts_) { + last_in_ts_ = msg_timestamp; + } + + // add new event to queue + event_queue.insert(evt); + } + + template + bool inputsAvailable(std::index_sequence) const + { + /* *INDENT-OFF* */ + // uncrustify messes with the brackets which are required (at least by GCC) + return (!std::get(event_queues_).empty() || ...); + /* *INDENT-ON* */ + } + + template + bool dispatchFirstMessage(std::index_sequence) + { + // get index of first sample + const std::array ts_array = + {std::get(event_queues_).firstTimeStamp()...}; + const std::size_t idx = getFirstSampleIdx(ts_array); + + // forward first message + /* *INDENT-OFF* */ + // uncrustify messes with the brackets which are required (at least by GCC) + return (dispatchFirstMessage(idx) || ...); + /* *INDENT-ON* */ + } + + template + bool dispatchFirstMessage(std::size_t idx) + { + using MEvent = typename std::tuple_element_t; + using Message = typename std::tuple_element_t; + + if (idx == I) { + auto & event_queue = std::get(event_queues_); + if (!event_queue.empty()) { + // dispatch first event + const MEvent & evt = *event_queue.begin(); + last_out_ts_ = + message_filters::message_traits::TimeStamp::value(*evt.getConstMessage()); + std::get(signals_).call(evt); + event_queue.popFirst(); + return true; + } else if ((last_in_ts_ - event_queue.firstTimeStamp()) >= timeout_) { + // timeout exceeded + event_queue.setActive(false); + return true; + } + } + // timeout not exceeded or queue not selected + return false; + } + + template + std::size_t getFirstSampleIdx(const std::array & ts) const + { + return std::distance(ts.begin(), std::min_element(ts.begin(), ts.end())); + } + + rclcpp::Duration timeout_; + rclcpp::Time last_in_ts_; + rclcpp::Time last_out_ts_; + + EventQueues event_queues_; + Connection input_connections_[N_INPUTS]; + Signals signals_; + std::string name_; + std::mutex mutex_; + rclcpp::TimerBase::SharedPtr dispatch_timer_; +}; + +} // namespace message_filters + +#endif // MESSAGE_FILTERS__INPUT_ALIGNER_HPP_ diff --git a/test/test_input_aligner.cpp b/test/test_input_aligner.cpp new file mode 100644 index 0000000..3e5a3a0 --- /dev/null +++ b/test/test_input_aligner.cpp @@ -0,0 +1,408 @@ +// Copyright 2024, Kraken Robotics. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the Willow Garage nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include "message_filters/input_aligner.hpp" + +class InputAligner; + +struct Header +{ + rclcpp::Time stamp; +}; + +struct Msg1 +{ + Header header; + int data; +}; + +struct Msg2 +{ + Header header; + int data; +}; + +namespace message_filters +{ +namespace message_traits +{ +template +struct HasHeader::value>::type>: public std::true_type {}; +} // namespace message_traits +} // namespace message_filters + +class InputAlignerTest : public ::testing::Test +{ +public: + InputAlignerTest() + : timeout_(1, 0), update_rate_(0, 1000000) + { + } + + virtual void SetUp() + { + // Shutdown in case there was a dangling global context from other test fixtures + rclcpp::shutdown(); + rclcpp::init(0, nullptr); + + node_ = std::make_shared("test_node"); + timeout_ = rclcpp::Duration(1, 0); + update_rate_ = rclcpp::Duration(0, 10000000); + } + + void TearDown() + { + node_.reset(); + rclcpp::shutdown(); + } + + template + void cb(const std::shared_ptr & msg) + { + cb_content_.push_back(msg->data); + } + + template + std::shared_ptr createMsg(const std::chrono::nanoseconds & ns, int data) + { + auto msg = std::make_shared(); + msg->header.stamp = rclcpp::Time(ns.count(), RCL_ROS_TIME); + msg->data = data; + return msg; + } + + rclcpp::Node::SharedPtr node_; + rclcpp::Duration timeout_; + rclcpp::Duration update_rate_; + std::vector cb_content_; +}; + +TEST_F(InputAlignerTest, init) +{ + message_filters::NullFilter f0, f1; + message_filters::NullFilter f2, f3; + message_filters::InputAligner aligner1(timeout_, f0, f1, f2, f3); + + message_filters::InputAligner aligner2(timeout_); + aligner2.connectInput(f0, f2, f3); +} + + +TEST_F(InputAlignerTest, dispatch_inputs_in_order) +{ + message_filters::InputAligner aligner(timeout_); + + // register callbacks + InputAlignerTest * test_fixture = dynamic_cast(this); + aligner.registerCallback<0>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<1>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<2>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<3>(&InputAlignerTest::cb, test_fixture); + + // set period for all inputs to one millisecond + aligner.setInputPeriod<0>(rclcpp::Duration(0, static_cast(4e6))); + aligner.setInputPeriod<1>(rclcpp::Duration(0, static_cast(4e6))); + aligner.setInputPeriod<2>(rclcpp::Duration(0, static_cast(4e6))); + aligner.setInputPeriod<3>(rclcpp::Duration(0, static_cast(4e6))); + + // send msgs in unaligned order + aligner.add<2>(createMsg(std::chrono::milliseconds(3), 3)); + aligner.add<0>(createMsg(std::chrono::milliseconds(1), 1)); + aligner.add<2>(createMsg(std::chrono::milliseconds(7), 7)); + aligner.add<0>(createMsg(std::chrono::milliseconds(5), 5)); + aligner.add<3>(createMsg(std::chrono::milliseconds(2), 2)); + aligner.add<0>(createMsg(std::chrono::milliseconds(9), 9)); + aligner.add<1>(createMsg(std::chrono::milliseconds(4), 4)); + aligner.add<1>(createMsg(std::chrono::milliseconds(8), 8)); + aligner.add<3>(createMsg(std::chrono::milliseconds(6), 6)); + + // dispatch messages + aligner.dispatchMessages(); + + // compare msg output + ASSERT_EQ(cb_content_.size(), 9); + for (size_t i = 0; i < cb_content_.size(); i++) { + EXPECT_EQ(cb_content_[i], i + 1); + } +} + +TEST_F(InputAlignerTest, ignores_inactive_inputs) +{ + message_filters::InputAligner aligner(timeout_); + + // register callbacks + InputAlignerTest * test_fixture = dynamic_cast(this); + aligner.registerCallback<0>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<1>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<2>(&InputAlignerTest::cb, test_fixture); + + // set period for all inputs to one millisecond + aligner.setInputPeriod<0>(rclcpp::Duration(0, static_cast(2e6))); + aligner.setInputPeriod<1>(rclcpp::Duration(0, static_cast(2e6))); + aligner.setInputPeriod<2>(rclcpp::Duration(0, static_cast(2e6))); + + // send msgs in unaligned order only to two inputs + aligner.add<2>(createMsg(std::chrono::milliseconds(2), 2)); + aligner.add<1>(createMsg(std::chrono::milliseconds(1), 1)); + aligner.add<2>(createMsg(std::chrono::milliseconds(4), 4)); + aligner.add<1>(createMsg(std::chrono::milliseconds(3), 3)); + aligner.add<1>(createMsg(std::chrono::milliseconds(5), 5)); + + // dispatch messages + aligner.dispatchMessages(); + + // compare msg output + ASSERT_EQ(cb_content_.size(), 5); + for (size_t i = 0; i < cb_content_.size(); i++) { + EXPECT_EQ(cb_content_[i], i + 1); + } +} + +TEST_F(InputAlignerTest, input_timeout) +{ + // set timeout of 10 ms + timeout_ = rclcpp::Duration(0, static_cast(1e7)); + message_filters::InputAligner aligner(timeout_); + + // register callbacks + InputAlignerTest * test_fixture = dynamic_cast(this); + aligner.registerCallback<0>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<1>(&InputAlignerTest::cb, test_fixture); + + // set period for all inputs to 2 milliseconds + aligner.setInputPeriod<0>(rclcpp::Duration(0, static_cast(2e6))); + aligner.setInputPeriod<1>(rclcpp::Duration(0, static_cast(2e6))); + + // adds 8 msgs on input 0 + for (int i = 1; i < 17; i = i + 2) { + aligner.add<0>(createMsg(std::chrono::milliseconds(i), i)); + } + // adds 2 msgs on input 1 + aligner.add<1>(createMsg(std::chrono::milliseconds(2), 2)); + aligner.add<1>(createMsg(std::chrono::milliseconds(4), 4)); + + // dispatch messages + aligner.dispatchMessages(); + + // compare msg output + // only msgs < 6 ms are forwarded, it will wait for msgs on input 1 + ASSERT_EQ(cb_content_.size(), 5); + for (size_t i = 0; i < cb_content_.size(); i++) { + EXPECT_EQ(cb_content_[i], i + 1); + } + + // a new msg with a time stamp > 10 ms from the + // last forwarded one will trigger the timeout for input 1 + aligner.add<0>(createMsg(std::chrono::milliseconds(17), 17)); + + // dispatch messages + aligner.dispatchMessages(); + ASSERT_EQ(cb_content_.size(), 11); + EXPECT_EQ(cb_content_[8], 13); + EXPECT_EQ(cb_content_[9], 15); + EXPECT_EQ(cb_content_[10], 17); +} + +TEST_F(InputAlignerTest, drops_msgs) +{ + message_filters::InputAligner aligner(timeout_); + + // register callbacks + InputAlignerTest * test_fixture = dynamic_cast(this); + aligner.registerCallback<0>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<1>(&InputAlignerTest::cb, test_fixture); + + // set period for all inputs to 2 milliseconds + aligner.setInputPeriod<0>(rclcpp::Duration(0, static_cast(2e6))); + aligner.setInputPeriod<1>(rclcpp::Duration(0, static_cast(2e6))); + + + aligner.add<1>(createMsg(std::chrono::milliseconds(4), 4)); + aligner.add<0>(createMsg(std::chrono::milliseconds(3), 3)); + + // dispatch messages + aligner.dispatchMessages(); + + ASSERT_EQ(cb_content_.size(), 2); + for (size_t i = 0; i < cb_content_.size(); i++) { + EXPECT_EQ(cb_content_[i], i + 3); + } + + // add also msg backwards in time + aligner.add<0>(createMsg(std::chrono::milliseconds(1), 1)); + aligner.add<0>(createMsg(std::chrono::milliseconds(5), 5)); + aligner.add<0>(createMsg(std::chrono::milliseconds(7), 7)); + aligner.add<1>(createMsg(std::chrono::milliseconds(2), 2)); + aligner.add<1>(createMsg(std::chrono::milliseconds(6), 6)); + + // dispatch messages + aligner.dispatchMessages(); + + ASSERT_EQ(cb_content_.size(), 5); + for (size_t i = 0; i < cb_content_.size(); i++) { + EXPECT_EQ(cb_content_[i], i + 3); + } +} + +TEST_F(InputAlignerTest, dispatch_by_timer) +{ + message_filters::InputAligner aligner(timeout_); + aligner.setupDispatchTimer(node_, update_rate_); + + // register callbacks + InputAlignerTest * test_fixture = dynamic_cast(this); + aligner.registerCallback<0>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<1>(&InputAlignerTest::cb, test_fixture); + + // set period for all inputs to 2 milliseconds + aligner.setInputPeriod<0>(rclcpp::Duration(0, static_cast(2e6))); + aligner.setInputPeriod<1>(rclcpp::Duration(0, static_cast(2e6))); + + aligner.add<1>(createMsg(std::chrono::milliseconds(2), 2)); + aligner.add<0>(createMsg(std::chrono::milliseconds(1), 1)); + + rclcpp::Rate(50).sleep(); + rclcpp::spin_some(node_); + + ASSERT_EQ(cb_content_.size(), 2); + for (size_t i = 0; i < cb_content_.size(); i++) { + EXPECT_EQ(cb_content_[i], i + 1); + } +} + +TEST_F(InputAlignerTest, no_period_information) +{ + // set timeout of 10 ms + timeout_ = rclcpp::Duration(0, static_cast(1e7)); + message_filters::InputAligner aligner(timeout_); + + // register callbacks + InputAlignerTest * test_fixture = dynamic_cast(this); + aligner.registerCallback<0>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<1>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<2>(&InputAlignerTest::cb, test_fixture); + + // send msgs in unaligned order only to two inputs + aligner.add<0>(createMsg(std::chrono::milliseconds(6), 6)); + aligner.add<2>(createMsg(std::chrono::milliseconds(2), 2)); + aligner.add<2>(createMsg(std::chrono::milliseconds(4), 4)); + aligner.add<1>(createMsg(std::chrono::milliseconds(1), 1)); + aligner.add<1>(createMsg(std::chrono::milliseconds(3), 3)); + aligner.add<1>(createMsg(std::chrono::milliseconds(5), 5)); + + // dispatch messages + aligner.dispatchMessages(); + + // the last two messages won't get dispatched + ASSERT_EQ(cb_content_.size(), 4); + for (size_t i = 0; i < cb_content_.size(); i++) { + EXPECT_EQ(cb_content_[i], i + 1); + } + + // trigger timeout on all other inputs to flush msgs + aligner.add<0>(createMsg(std::chrono::milliseconds(16), 16)); + + // dispatch messages + aligner.dispatchMessages(); + ASSERT_EQ(cb_content_.size(), 7); +} + +TEST_F(InputAlignerTest, get_queue_status) +{ + // set timeout of 10 ms + timeout_ = rclcpp::Duration(0, static_cast(1e7)); + message_filters::InputAligner aligner(timeout_); + + // register callbacks + InputAlignerTest * test_fixture = dynamic_cast(this); + aligner.registerCallback<0>(&InputAlignerTest::cb, test_fixture); + aligner.registerCallback<1>(&InputAlignerTest::cb, test_fixture); + + // set period for all inputs to 2 milliseconds + aligner.setInputPeriod<0>(rclcpp::Duration(0, static_cast(2e6))); + aligner.setInputPeriod<1>(rclcpp::Duration(0, static_cast(2e6))); + + aligner.add<1>(createMsg(std::chrono::milliseconds(2), 2)); + aligner.add<0>(createMsg(std::chrono::milliseconds(3), 3)); + aligner.add<0>(createMsg(std::chrono::milliseconds(5), 5)); + + auto status_0 = aligner.getQueueStatus<0>(); + EXPECT_FALSE(status_0.active); + EXPECT_EQ(status_0.queue_size, 2); + EXPECT_EQ(status_0.msgs_processed, 0); + EXPECT_EQ(status_0.msgs_dropped, 0); + auto status_1 = aligner.getQueueStatus<1>(); + EXPECT_FALSE(status_1.active); + EXPECT_EQ(status_1.queue_size, 1); + EXPECT_EQ(status_1.msgs_processed, 0); + EXPECT_EQ(status_1.msgs_dropped, 0); + + // dispatch messages + aligner.dispatchMessages(); + + status_0 = aligner.getQueueStatus<0>(); + EXPECT_TRUE(status_0.active); + EXPECT_EQ(status_0.queue_size, 1); + EXPECT_EQ(status_0.msgs_processed, 1); + EXPECT_EQ(status_0.msgs_dropped, 0); + status_1 = aligner.getQueueStatus<1>(); + EXPECT_TRUE(status_1.active); + EXPECT_EQ(status_1.queue_size, 0); + EXPECT_EQ(status_1.msgs_processed, 1); + EXPECT_EQ(status_1.msgs_dropped, 0); + + // adds sample outside of filter scope + aligner.add<0>(createMsg(std::chrono::milliseconds(1), 1)); + // adds a sample ahead forcing input 1 to time out + aligner.add<0>(createMsg(std::chrono::milliseconds(17), 17)); + + // dispatch messages + aligner.dispatchMessages(); + + status_0 = aligner.getQueueStatus<0>(); + EXPECT_TRUE(status_0.active); + EXPECT_EQ(status_0.queue_size, 0); + EXPECT_EQ(status_0.msgs_processed, 3); + EXPECT_EQ(status_0.msgs_dropped, 1); + status_1 = aligner.getQueueStatus<1>(); + EXPECT_FALSE(status_1.active); + EXPECT_EQ(status_1.queue_size, 0); + EXPECT_EQ(status_1.msgs_processed, 1); + EXPECT_EQ(status_1.msgs_dropped, 0); +}