diff --git a/fastddsspy_participants/include/fastddsspy_participants/model/DataStreamer.hpp b/fastddsspy_participants/include/fastddsspy_participants/model/DataStreamer.hpp index e8b85c92..0944292d 100644 --- a/fastddsspy_participants/include/fastddsspy_participants/model/DataStreamer.hpp +++ b/fastddsspy_participants/include/fastddsspy_participants/model/DataStreamer.hpp @@ -54,6 +54,7 @@ class DataStreamer : public TopicRateCalculator FASTDDSSPY_PARTICIPANTS_DllAPI bool activate( const ddspipe::core::types::WildcardDdsFilterTopic& topic_to_activate, + const std::set& topics, const std::shared_ptr& callback); FASTDDSSPY_PARTICIPANTS_DllAPI @@ -71,20 +72,20 @@ class DataStreamer : public TopicRateCalculator FASTDDSSPY_PARTICIPANTS_DllAPI bool is_topic_type_discovered( - const ddspipe::core::types::WildcardDdsFilterTopic& topic_to_activate) const noexcept; + const ddspipe::core::types::DdsTopic& topic_to_activate) const noexcept; FASTDDSSPY_PARTICIPANTS_DllAPI - bool is_topic_type_discovered( - const ddspipe::core::types::DdsTopic& topic_to_activate) const noexcept; + bool is_any_topic_type_discovered( + const std::set& topics) const noexcept; protected: - bool is_topic_type_discovered_nts_( - const ddspipe::core::types::WildcardDdsFilterTopic& topic_to_activate) const noexcept; - bool is_topic_type_discovered_nts_( const ddspipe::core::types::DdsTopic& topic_to_activate) const noexcept; + bool is_any_topic_type_discovered_nts_( + const std::set& topics) const noexcept; + bool activated_ {false}; std::shared_ptr callback_; diff --git a/fastddsspy_participants/src/cpp/model/DataStreamer.cpp b/fastddsspy_participants/src/cpp/model/DataStreamer.cpp index 667d2ab4..24f34dcb 100644 --- a/fastddsspy_participants/src/cpp/model/DataStreamer.cpp +++ b/fastddsspy_participants/src/cpp/model/DataStreamer.cpp @@ -43,9 +43,10 @@ bool DataStreamer::activate_all( bool DataStreamer::activate( const ddspipe::core::types::WildcardDdsFilterTopic& topic_to_activate, + const std::set& topics, const std::shared_ptr& callback) { - if (!is_topic_type_discovered(topic_to_activate)) + if (!is_any_topic_type_discovered(topics)) { EPROSIMA_LOG_WARNING(FASTDDSSPY_DATASTREAMER, "Type <" << topic_to_activate.type_name << @@ -157,40 +158,38 @@ void DataStreamer::add_data( } bool DataStreamer::is_topic_type_discovered( - const ddspipe::core::types::WildcardDdsFilterTopic& filter_topic) const noexcept + const ddspipe::core::types::DdsTopic& topic) const noexcept { std::shared_lock _(mutex_); - return is_topic_type_discovered_nts_(filter_topic); + return is_topic_type_discovered_nts_(topic); } bool DataStreamer::is_topic_type_discovered_nts_( - const ddspipe::core::types::WildcardDdsFilterTopic& filter_topic) const noexcept + const ddspipe::core::types::DdsTopic& topic) const noexcept { - ddspipe::core::types::DdsTopic topic; - for (const auto& it : topic_type_discovered_) - { - topic.m_topic_name = it.first; - topic.type_name = it.second; - if (filter_topic.matches(topic) == true) - { - return true; - } - } - - return false; + return types_discovered_.find(topic.type_name) != types_discovered_.end(); } -bool DataStreamer::is_topic_type_discovered( - const ddspipe::core::types::DdsTopic& topic) const noexcept +bool DataStreamer::is_any_topic_type_discovered( + const std::set& topics) const noexcept { std::shared_lock _(mutex_); - return is_topic_type_discovered_nts_(topic); + return is_any_topic_type_discovered_nts_(topics); } -bool DataStreamer::is_topic_type_discovered_nts_( - const ddspipe::core::types::DdsTopic& topic) const noexcept +bool DataStreamer::is_any_topic_type_discovered_nts_( + const std::set& topics) const noexcept { - return types_discovered_.find(topic.type_name) != types_discovered_.end(); + for (const auto& topic : topics) + { + if (types_discovered_.find(topic.type_name) != types_discovered_.end()) + { + // If there's at least one topic that matches the filter topic return true + return true; + } + } + + return false; } } /* namespace participants */ diff --git a/fastddsspy_participants/test/unittest/model/DataStreamerTest.cpp b/fastddsspy_participants/test/unittest/model/DataStreamerTest.cpp index 6e143179..dcbd99f3 100644 --- a/fastddsspy_participants/test/unittest/model/DataStreamerTest.cpp +++ b/fastddsspy_participants/test/unittest/model/DataStreamerTest.cpp @@ -36,26 +36,32 @@ fastdds::dds::DynamicType::_ref_type create_schema( return dynamic_type_topic; } -// TEST(DataStreamerTest, activate_false) -// { -// spy::participants::DataStreamer ds; -// ddspipe::core::types::WildcardDdsFilterTopic filter_topic; -// filter_topic.topic_name = std::string("topic1"); -// filter_topic.type_name = std::string("type1"); +TEST(DataStreamerTest, activate_false) +{ + std::set network_topics; + spy::participants::DataStreamer ds; -// std::shared_ptr cb = -// std::make_shared(); + ddspipe::core::types::WildcardDdsFilterTopic filter_topic; + filter_topic.topic_name = std::string("topic1"); + filter_topic.type_name = std::string("type1"); -// ASSERT_FALSE(ds.activate(filter_topic, cb)); -// } + std::shared_ptr cb = + std::make_shared(); + + ASSERT_FALSE(ds.activate(filter_topic, network_topics, cb)); +} TEST(DataStreamerTest, activate_true) { + std::set network_topics; spy::participants::DataStreamer ds; + ddspipe::core::types::DdsTopic topic; topic.m_topic_name = "topic1"; topic.type_name = "type1"; + network_topics.insert(topic); + std::shared_ptr cb = std::make_shared(); @@ -69,17 +75,20 @@ TEST(DataStreamerTest, activate_true) filter_topic.topic_name = topic.m_topic_name; filter_topic.type_name = topic.type_name; - ASSERT_TRUE(ds.activate(filter_topic, cb)); + ASSERT_TRUE(ds.activate(filter_topic, network_topics, cb)); } TEST(DataStreamerTest, activate_twice) { + std::set network_topics; spy::participants::DataStreamer ds; ddspipe::core::types::DdsTopic topic_1; topic_1.m_topic_name = "topic1"; topic_1.type_name = "type1"; + network_topics.insert(topic_1); + std::shared_ptr cb = std::make_shared(); @@ -89,19 +98,29 @@ TEST(DataStreamerTest, activate_twice) fastdds::dds::xtypes::TypeIdentifier type_identifier_1; ds.add_schema(dynamic_type_topic_1, type_identifier_1); + ddspipe::core::types::WildcardDdsFilterTopic filter_topic_1; + filter_topic_1.topic_name = topic_1.m_topic_name; + filter_topic_1.type_name = topic_1.type_name; + ddspipe::core::types::DdsTopic topic_2; topic_2.m_topic_name = "topic2"; topic_2.type_name = "type2"; + network_topics.insert(topic_2); + fastdds::dds::DynamicType::_ref_type dynamic_type_topic_2; dynamic_type_topic_2 = create_schema(topic_2); fastdds::dds::xtypes::TypeIdentifier type_identifier_2; ds.add_schema(dynamic_type_topic_2, type_identifier_2); + ddspipe::core::types::WildcardDdsFilterTopic filter_topic_2; + filter_topic_2.topic_name = topic_2.m_topic_name; + filter_topic_2.type_name = topic_2.type_name; + // is this the correct behaviour? - ASSERT_TRUE(ds.activate(topic_1, cb)); - ASSERT_TRUE(ds.activate(topic_2, cb)); + ASSERT_TRUE(ds.activate(filter_topic_1, network_topics, cb)); + ASSERT_TRUE(ds.activate(filter_topic_2, network_topics, cb)); } TEST(DataStreamerTest, topic_type_discovered) @@ -127,164 +146,193 @@ TEST(DataStreamerTest, topic_type_discovered) ASSERT_TRUE(ds.is_topic_type_discovered(topic_2)); } -// TEST(DataStreamerTest, deactivate) -// { -// spy::participants::DataStreamer ds; -// ddspipe::core::types::DdsTopic topic; -// topic.m_topic_name = "topic1"; -// topic.type_name = "type1"; +TEST(DataStreamerTest, deactivate) +{ + std::set network_topics; + spy::participants::DataStreamer ds; + + ddspipe::core::types::DdsTopic topic; + topic.m_topic_name = "topic1"; + topic.type_name = "type1"; + + std::atomic data_sent(0); + + std::shared_ptr cb = + std::make_shared( + [&data_sent] + (const ddspipe::core::types::DdsTopic& topic, + const fastdds::dds::DynamicType::_ref_type& type, + const ddspipe::core::types::RtpsPayloadData& data) + { + data_sent++; + }); + + fastdds::dds::DynamicType::_ref_type dynamic_type_topic; + dynamic_type_topic = create_schema(topic); + + fastdds::dds::xtypes::TypeIdentifier type_identifier; + ds.add_schema(dynamic_type_topic, type_identifier); + + network_topics.insert(topic); + + ddspipe::core::types::WildcardDdsFilterTopic filter_topic; + filter_topic.topic_name = topic.m_topic_name; + filter_topic.type_name = topic.type_name; + + ds.activate(filter_topic, network_topics, cb); -// std::atomic data_sent(0); + ddspipe::core::types::RtpsPayloadData data; -// std::shared_ptr cb = -// std::make_shared( -// [&data_sent] -// (const ddspipe::core::types::DdsTopic& topic, -// const fastdds::dds::DynamicType::_ref_type& type, -// const ddspipe::core::types::RtpsPayloadData& data) -// { -// data_sent++; -// }); + unsigned int rand_1 = rand() % 20; -// fastdds::dds::DynamicType::_ref_type dynamic_type_topic; -// dynamic_type_topic = create_schema(topic); + for (unsigned int i = 0; i < rand_1; i++) + { + ds.add_data(topic, data); + } -// fastdds::dds::xtypes::TypeIdentifier type_identifier; -// ds.add_schema(dynamic_type_topic, type_identifier); + ds.deactivate(); -// ds.activate(topic, cb); + unsigned int rand_2 = rand() % 20; -// ddspipe::core::types::RtpsPayloadData data; + for (unsigned int i = 0; i < rand_2; i++) + { + ds.add_data(topic, data); + } -// unsigned int rand_1 = rand() % 20; + ASSERT_EQ(data_sent, rand_1); +} -// for (unsigned int i = 0; i < rand_1; i++) -// { -// ds.add_data(topic, data); -// } +TEST(DataStreamerTest, add_data) +{ + std::set network_topics; + spy::participants::DataStreamer ds; -// ds.deactivate(); + ddspipe::core::types::DdsTopic topic; + topic.m_topic_name = "topic1"; + topic.type_name = "type1"; -// unsigned int rand_2 = rand() % 20; + std::atomic data_sent(0); -// for (unsigned int i = 0; i < rand_2; i++) -// { -// ds.add_data(topic, data); -// } + std::shared_ptr cb = + std::make_shared( + [&data_sent] + (const ddspipe::core::types::DdsTopic& topic, + const fastdds::dds::DynamicType::_ref_type& type, + const ddspipe::core::types::RtpsPayloadData& data) + { + data_sent++; + }); -// ASSERT_EQ(data_sent, rand_1); -// } + fastdds::dds::DynamicType::_ref_type dynamic_type_topic; + dynamic_type_topic = create_schema(topic); -// TEST(DataStreamerTest, add_data) -// { -// spy::participants::DataStreamer ds; -// ddspipe::core::types::DdsTopic topic; -// topic.m_topic_name = "topic1"; -// topic.type_name = "type1"; + fastdds::dds::xtypes::TypeIdentifier type_identifier; + ds.add_schema(dynamic_type_topic, type_identifier); + + network_topics.insert(topic); + + ddspipe::core::types::WildcardDdsFilterTopic filter_topic; + filter_topic.topic_name = topic.m_topic_name; + filter_topic.type_name = topic.type_name; -// std::atomic data_sent(0); + ds.activate(filter_topic, network_topics, cb); -// std::shared_ptr cb = -// std::make_shared( -// [&data_sent] -// (const ddspipe::core::types::DdsTopic& topic, -// const fastdds::dds::DynamicType::_ref_type& type, -// const ddspipe::core::types::RtpsPayloadData& data) -// { -// data_sent++; -// }); + ddspipe::core::types::RtpsPayloadData data; -// fastdds::dds::DynamicType::_ref_type dynamic_type_topic; -// dynamic_type_topic = create_schema(topic); + unsigned int rand_1 = rand() % 20; -// fastdds::dds::xtypes::TypeIdentifier type_identifier; -// ds.add_schema(dynamic_type_topic, type_identifier); + for (unsigned int i = 0; i < rand_1; i++) + { + ds.add_data(topic, data); + } -// ds.activate(topic, cb); + ASSERT_EQ(data_sent, rand_1); +} -// ddspipe::core::types::RtpsPayloadData data; +TEST(DataStreamerTest, add_data_two_topics) +{ + std::set network_topics; + spy::participants::DataStreamer ds; -// unsigned int rand_1 = rand() % 20; + ddspipe::core::types::DdsTopic topic_1; + topic_1.m_topic_name = "topic1"; + topic_1.type_name = "type1"; -// for (unsigned int i = 0; i < rand_1; i++) -// { -// ds.add_data(topic, data); -// } + std::atomic data_sent_1(0); -// ASSERT_EQ(data_sent, rand_1); -// } + std::shared_ptr cb_1 = + std::make_shared( + [&data_sent_1] + (const ddspipe::core::types::DdsTopic& topic, + const fastdds::dds::DynamicType::_ref_type& type, + const ddspipe::core::types::RtpsPayloadData& data) + { + data_sent_1++; + }); -// TEST(DataStreamerTest, add_data_two_topics) -// { -// spy::participants::DataStreamer ds; + fastdds::dds::DynamicType::_ref_type dynamic_type_topic_1; + dynamic_type_topic_1 = create_schema(topic_1); -// ddspipe::core::types::DdsTopic topic_1; -// topic_1.m_topic_name = "topic1"; -// topic_1.type_name = "type1"; + fastdds::dds::xtypes::TypeIdentifier type_identifier_1; + ds.add_schema(dynamic_type_topic_1, type_identifier_1); -// std::atomic data_sent_1(0); + network_topics.insert(topic_1); -// std::shared_ptr cb_1 = -// std::make_shared( -// [&data_sent_1] -// (const ddspipe::core::types::DdsTopic& topic, -// const fastdds::dds::DynamicType::_ref_type& type, -// const ddspipe::core::types::RtpsPayloadData& data) -// { -// data_sent_1++; -// }); + ddspipe::core::types::WildcardDdsFilterTopic filter_topic_1; + filter_topic_1.topic_name = topic_1.m_topic_name; + filter_topic_1.type_name = topic_1.type_name; -// fastdds::dds::DynamicType::_ref_type dynamic_type_topic_1; -// dynamic_type_topic_1 = create_schema(topic_1); + ds.activate(filter_topic_1, network_topics, cb_1); -// fastdds::dds::xtypes::TypeIdentifier type_identifier_1; -// ds.add_schema(dynamic_type_topic_1, type_identifier_1); + ddspipe::core::types::DdsTopic topic_2; + topic_2.m_topic_name = "topic2"; + topic_2.type_name = "type2"; -// ds.activate(topic_1, cb_1); + std::atomic data_sent_2(0); -// ddspipe::core::types::DdsTopic topic_2; -// topic_2.m_topic_name = "topic2"; -// topic_2.type_name = "type2"; + std::shared_ptr cb_2 = + std::make_shared( + [&data_sent_2] + (const ddspipe::core::types::DdsTopic& topic, + const fastdds::dds::DynamicType::_ref_type& type, + const ddspipe::core::types::RtpsPayloadData& data) + { + data_sent_2++; + }); -// std::atomic data_sent_2(0); + fastdds::dds::DynamicType::_ref_type dynamic_type_topic_2; + dynamic_type_topic_2 = create_schema(topic_2); -// std::shared_ptr cb_2 = -// std::make_shared( -// [&data_sent_2] -// (const ddspipe::core::types::DdsTopic& topic, -// const fastdds::dds::DynamicType::_ref_type& type, -// const ddspipe::core::types::RtpsPayloadData& data) -// { -// data_sent_2++; -// }); + fastdds::dds::xtypes::TypeIdentifier type_identifier_2; + ds.add_schema(dynamic_type_topic_2, type_identifier_2); -// fastdds::dds::DynamicType::_ref_type dynamic_type_topic_2; -// dynamic_type_topic_2 = create_schema(topic_2); + network_topics.insert(topic_2); -// fastdds::dds::xtypes::TypeIdentifier type_identifier_2; -// ds.add_schema(dynamic_type_topic_2, type_identifier_2); + ddspipe::core::types::WildcardDdsFilterTopic filter_topic_2; + filter_topic_2.topic_name = topic_2.m_topic_name; + filter_topic_2.type_name = topic_2.type_name; -// ds.activate(topic_2, cb_2); + ds.activate(filter_topic_2, network_topics, cb_2); -// ddspipe::core::types::RtpsPayloadData data; + ddspipe::core::types::RtpsPayloadData data; -// unsigned int rand_1 = rand() % 20; + unsigned int rand_1 = rand() % 20; -// for (unsigned int i = 0; i < rand_1; i++) -// { -// ds.add_data(topic_1, data); -// } + for (unsigned int i = 0; i < rand_1; i++) + { + ds.add_data(topic_1, data); + } -// unsigned int rand_2 = rand() % 20; + unsigned int rand_2 = rand() % 20; -// for (unsigned int i = 0; i < rand_2; i++) -// { -// ds.add_data(topic_2, data); -// } + for (unsigned int i = 0; i < rand_2; i++) + { + ds.add_data(topic_2, data); + } -// ASSERT_FALSE(data_sent_1); -// ASSERT_EQ(data_sent_2, rand_2); -// } + ASSERT_FALSE(data_sent_1); + ASSERT_EQ(data_sent_2, rand_2); +} int main( int argc, diff --git a/fastddsspy_tool/src/cpp/tool/Controller.cpp b/fastddsspy_tool/src/cpp/tool/Controller.cpp index 93108550..710cc432 100644 --- a/fastddsspy_tool/src/cpp/tool/Controller.cpp +++ b/fastddsspy_tool/src/cpp/tool/Controller.cpp @@ -390,25 +390,6 @@ void Controller::print_command_( return; } - bool topic_available = false; - for (const auto& topic : topics) - { - topic_available = model_->is_topic_type_discovered(topic); - if (topic_available) - { - break; - } - } - - if (!topic_available) - { - view_.show_error(STR_ENTRY - << "Topic Type <" - << topic_name - << "> has not been discovered, and thus cannot print its data."); - return; - } - // Check if verbose is set bool verbose = false; if (arguments.size() >= 3) @@ -446,16 +427,8 @@ void Controller::print_command_( // Must activate data streamer with the required callback bool activated = model_->activate( filter_topic, + topics, callback); - - if (!activated) - { - view_.show_error(STR_ENTRY - << "Error showing data for topic <" - << topic_name - << "."); - return; - } } // Wait for other command to stop printing topics