From 9154c9fa7f45e56dfa43c9c7bb906562d2bb6619 Mon Sep 17 00:00:00 2001 From: Kerstin Keller Date: Wed, 24 Jan 2024 10:46:17 +0100 Subject: [PATCH] Template class for CMessageSubscriber (instead of CMsgSubscriber) - No inheritance required. For CProtoDynSubscriber implies API change. --- app/mon/mon_cli/CMakeLists.txt | 4 +- app/mon/mon_cli/src/ecal_mon_cli.cpp | 4 +- .../capnproto_reflection/CMakeLists.txt | 4 +- .../protobuf_reflection/CMakeLists.txt | 4 +- .../protobuf_reflection/src/plugin_widget.cpp | 12 +- .../protobuf_reflection/src/plugin_widget.h | 14 +- .../signals_plotting/CMakeLists.txt | 4 +- .../signals_plotting/src/plugin_widget.cpp | 12 +- .../signals_plotting/src/plugin_widget.h | 10 +- .../view/message_visualization/proto_tree.hpp | 6 +- .../viewmodel/message_visualization/proto.hpp | 13 +- ecal/core/include/ecal/ecal_types.h | 13 +- .../core/include/ecal/msg/capnproto/dynamic.h | 198 +++------- .../include/ecal/msg/capnproto/subscriber.h | 256 ++----------- ecal/core/include/ecal/msg/dynamic.h | 271 +++++++++++++- .../include/ecal/msg/flatbuffers/subscriber.h | 103 +----- .../include/ecal/msg/messagepack/subscriber.h | 102 +---- ecal/core/include/ecal/msg/protobuf/client.h | 5 +- .../ecal/msg/protobuf/dynamic_subscriber.h | 350 ++++-------------- ecal/core/include/ecal/msg/protobuf/server.h | 3 +- .../include/ecal/msg/protobuf/subscriber.h | 115 ++---- .../core/include/ecal/msg/string/subscriber.h | 109 ++---- ecal/core/include/ecal/msg/subscriber.h | 196 +++++++++- .../pubsub/protobuf/person_rec/CMakeLists.txt | 4 +- .../protobuf/proto_dyn_rec/CMakeLists.txt | 4 +- .../proto_dyn_rec/src/proto_dyn_rec.cpp | 6 +- .../cpp/pubsub_proto_test/CMakeLists.txt | 4 +- .../src/proto_dyn_subscriber_test.cpp | 15 +- .../capnp/addressbook_rec/CMakeLists.txt | 4 +- .../addressbook_rec/src/addressbook_rec.cpp | 4 +- .../capnp/addressbook_rec_cb/CMakeLists.txt | 4 +- .../addressbook_rec_dynamic/CMakeLists.txt | 4 +- .../src/addressbook_rec_dynamic.cpp | 8 +- .../capnp/addressbook_snd/CMakeLists.txt | 4 +- 34 files changed, 773 insertions(+), 1096 deletions(-) diff --git a/app/mon/mon_cli/CMakeLists.txt b/app/mon/mon_cli/CMakeLists.txt index 3b99c47dae..1b11199c36 100644 --- a/app/mon/mon_cli/CMakeLists.txt +++ b/app/mon/mon_cli/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ target_link_libraries(${PROJECT_NAME} tclap::tclap eCAL::core_protobuf eCAL::core_pb) -target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) ecal_install_app(${PROJECT_NAME}) diff --git a/app/mon/mon_cli/src/ecal_mon_cli.cpp b/app/mon/mon_cli/src/ecal_mon_cli.cpp index afa6727f04..39925f7f45 100644 --- a/app/mon/mon_cli/src/ecal_mon_cli.cpp +++ b/app/mon/mon_cli/src/ecal_mon_cli.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -363,7 +363,7 @@ void ProcProto(const std::string& topic_name, int msg_count) // create dynamic subscribers for receiving and decoding messages and assign callback eCAL::protobuf::CDynamicSubscriber sub(topic_name); std::atomic cnt(msg_count); - auto msg_cb = [&cnt](const google::protobuf::Message& msg_) { if (cnt != 0) { std::cout << msg_.DebugString() << std::endl; if (cnt > 0) cnt--; } }; + auto msg_cb = [&cnt](const std::shared_ptr& msg_) { if (cnt != 0) { std::cout << msg_->DebugString() << std::endl; if (cnt > 0) cnt--; } }; sub.AddReceiveCallback(std::bind(msg_cb, std::placeholders::_2)); // enter main loop diff --git a/app/mon/mon_plugins/capnproto_reflection/CMakeLists.txt b/app/mon/mon_plugins/capnproto_reflection/CMakeLists.txt index f948a3ac5b..0bea6fa90e 100644 --- a/app/mon/mon_plugins/capnproto_reflection/CMakeLists.txt +++ b/app/mon/mon_plugins/capnproto_reflection/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -67,6 +67,8 @@ target_link_libraries (${PROJECT_NAME} eCAL::mon_plugin_lib MonitorTreeView ) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) + target_link_options(${PROJECT_NAME} PRIVATE $<$:/ignore:4099>) target_include_directories(${PROJECT_NAME} PRIVATE src) diff --git a/app/mon/mon_plugins/protobuf_reflection/CMakeLists.txt b/app/mon/mon_plugins/protobuf_reflection/CMakeLists.txt index e76cb873e5..4848e5275d 100644 --- a/app/mon/mon_plugins/protobuf_reflection/CMakeLists.txt +++ b/app/mon/mon_plugins/protobuf_reflection/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -69,7 +69,7 @@ target_link_libraries (${PROJECT_NAME} MonitorTreeView eCAL::mon_plugin_lib ) -target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) if(MSVC) set_target_properties(${PROJECT_NAME} PROPERTIES COMPILE_FLAGS "/wd4127 /wd4714") diff --git a/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.cpp b/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.cpp index d7a2201c36..d0051340e7 100644 --- a/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.cpp +++ b/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -110,7 +110,7 @@ PluginWidget::~PluginWidget() { std::lock_guard lock(proto_message_mutex_); - delete last_proto_message_; + last_proto_message_.reset(); } } @@ -157,19 +157,15 @@ void PluginWidget::updatePublishTimeLabel() //////////////////////////////////////////////////////////////////////////////// // eCAL Callback -void PluginWidget::onProtoMessageCallback(const google::protobuf::Message& message, long long send_time_usecs) +void PluginWidget::onProtoMessageCallback(const std::shared_ptr& message, long long send_time_usecs) { { // Lock the mutex std::lock_guard lock(proto_message_mutex_); - // Delete the old message - delete last_proto_message_; - // Create a copy of the new message as member variable. We cannot use a reference here, as this may cause a deadlock with the GUI thread - last_proto_message_ = message.New(); - last_proto_message_->CopyFrom(message); + last_proto_message_ = message; last_message_publish_timestamp_ = eCAL::Time::ecal_clock::time_point(std::chrono::duration_cast(std::chrono::microseconds(send_time_usecs))); diff --git a/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.h b/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.h index 992c148e14..2ebbff3ed9 100644 --- a/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.h +++ b/app/mon/mon_plugins/protobuf_reflection/src/plugin_widget.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,11 +68,11 @@ private slots: eCAL::protobuf::CProtoDecoder protobuf_decoder; std::shared_ptr protobuf_tree_builder; - std::mutex proto_message_mutex_; - google::protobuf::Message* last_proto_message_; - eCAL::Time::ecal_clock::time_point last_message_publish_timestamp_; - QString last_error_string_; - bool last_message_was_error_; + std::mutex proto_message_mutex_; + std::shared_ptr last_proto_message_; + eCAL::Time::ecal_clock::time_point last_message_publish_timestamp_; + QString last_error_string_; + bool last_message_was_error_; bool currently_showing_error_item_; int error_counter_; @@ -83,7 +83,7 @@ private slots: bool new_msg_available_; int received_message_counter_; - void onProtoMessageCallback(const google::protobuf::Message& message, long long send_time_usecs); + void onProtoMessageCallback(const std::shared_ptr& message, long long send_time_usecs); void onProtoErrorCallback(const std::string& error); void updatePublishTimeLabel(); diff --git a/app/mon/mon_plugins/signals_plotting/CMakeLists.txt b/app/mon/mon_plugins/signals_plotting/CMakeLists.txt index e668f0bcef..961dfedabe 100644 --- a/app/mon/mon_plugins/signals_plotting/CMakeLists.txt +++ b/app/mon/mon_plugins/signals_plotting/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -84,7 +84,7 @@ target_link_libraries (${PROJECT_NAME} CustomQt ) -target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) if(MSVC) set_target_properties(${PROJECT_NAME} PROPERTIES COMPILE_FLAGS "/wd4127 /wd4714" ) diff --git a/app/mon/mon_plugins/signals_plotting/src/plugin_widget.cpp b/app/mon/mon_plugins/signals_plotting/src/plugin_widget.cpp index 80f8347b47..2b6ddd7159 100644 --- a/app/mon/mon_plugins/signals_plotting/src/plugin_widget.cpp +++ b/app/mon/mon_plugins/signals_plotting/src/plugin_widget.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -132,7 +132,7 @@ PluginWidget::~PluginWidget() { std::lock_guard lock(proto_message_mutex_); - delete last_proto_message_; + last_proto_message_.reset(); } } @@ -256,19 +256,15 @@ void PluginWidget::setVisibleSplitterHandle(bool state) //////////////////////////////////////////////////////////////////////////////// // eCAL Callback -void PluginWidget::onProtoMessageCallback(const google::protobuf::Message& message, long long send_time_usecs) +void PluginWidget::onProtoMessageCallback(const std::shared_ptr& message, long long send_time_usecs) { { // Lock the mutex std::lock_guard lock(proto_message_mutex_); - // Delete the old message - delete last_proto_message_; - // Create a copy of the new message as member variable. We cannot use a reference here, as this may cause a deadlock with the GUI thread - last_proto_message_ = message.New(); - last_proto_message_->CopyFrom(message); + last_proto_message_ = message; last_message_publish_timestamp_ = eCAL::Time::ecal_clock::time_point(std::chrono::duration_cast(std::chrono::microseconds(send_time_usecs))); diff --git a/app/mon/mon_plugins/signals_plotting/src/plugin_widget.h b/app/mon/mon_plugins/signals_plotting/src/plugin_widget.h index e10787c4ee..27ce7c290a 100644 --- a/app/mon/mon_plugins/signals_plotting/src/plugin_widget.h +++ b/app/mon/mon_plugins/signals_plotting/src/plugin_widget.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -102,9 +102,9 @@ private slots: eCAL::protobuf::CProtoDecoder protobuf_decoder; std::shared_ptr protobuf_tree_builder; - std::mutex proto_message_mutex_; - google::protobuf::Message* last_proto_message_; - eCAL::Time::ecal_clock::time_point last_message_publish_timestamp_; + std::mutex proto_message_mutex_; + std::shared_ptr last_proto_message_; + eCAL::Time::ecal_clock::time_point last_message_publish_timestamp_; QString last_error_string_; bool last_message_was_error_; int error_counter_; @@ -119,7 +119,7 @@ private slots: QString key_to_close_; - void onProtoMessageCallback(const google::protobuf::Message& message, long long send_time_usecs); + void onProtoMessageCallback(const std::shared_ptr& message, long long send_time_usecs); void onProtoErrorCallback(const std::string& error); void updatePublishTimeLabel(); bool find_items(QAbstractTreeItem* tree_item); diff --git a/app/mon/mon_tui/src/tui/view/message_visualization/proto_tree.hpp b/app/mon/mon_tui/src/tui/view/message_visualization/proto_tree.hpp index 9864bafff7..7d64516496 100644 --- a/app/mon/mon_tui/src/tui/view/message_visualization/proto_tree.hpp +++ b/app/mon/mon_tui/src/tui/view/message_visualization/proto_tree.hpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -261,7 +261,7 @@ class TreeMessageVisitor : public MessageVisitor } -void PopulateProtoTree(ftxui::TreeNode &root, google::protobuf::Message *message, const std::shared_ptr style) +void PopulateProtoTree(ftxui::TreeNode &root, const std::shared_ptr& message, const std::shared_ptr style) { auto tree_builder = std::make_shared(root, style); if(message) @@ -272,7 +272,7 @@ void PopulateProtoTree(ftxui::TreeNode &root, google::protobuf::Message *message } } -ftxui::TreeNodePtr ProtoTree(google::protobuf::Message *message, const std::shared_ptr style) +ftxui::TreeNodePtr ProtoTree(const std::shared_ptr& message, const std::shared_ptr style) { using namespace ftxui; auto root = std::make_shared(); diff --git a/app/mon/mon_tui/src/tui/viewmodel/message_visualization/proto.hpp b/app/mon/mon_tui/src/tui/viewmodel/message_visualization/proto.hpp index 1a170baf1d..220236ae3d 100644 --- a/app/mon/mon_tui/src/tui/viewmodel/message_visualization/proto.hpp +++ b/app/mon/mon_tui/src/tui/viewmodel/message_visualization/proto.hpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -35,15 +36,13 @@ class ProtoMessageVisualizationViewModel : public MessageVisualizationViewModel eCAL::protobuf::CDynamicSubscriber subscriber; mutable std::mutex message_mtx; - google::protobuf::Message *latest_message = nullptr; + std::shared_ptr latest_message = nullptr; - void OnMessage(const google::protobuf::Message& message, long long send_time_usecs) + void OnMessage(const std::shared_ptr& message, long long send_time_usecs) { { std::lock_guard lock(message_mtx); - delete latest_message; - latest_message = message.New(); - latest_message->CopyFrom(message); + latest_message = message; message_timestamp = send_time_usecs; } @@ -56,7 +55,7 @@ class ProtoMessageVisualizationViewModel : public MessageVisualizationViewModel //NOTE: Use with caution! struct ProtectedMessage { - google::protobuf::Message *message; + std::shared_ptr message; std::unique_lock lock; }; diff --git a/ecal/core/include/ecal/ecal_types.h b/ecal/core/include/ecal/ecal_types.h index 14187eaf6f..5e32297c9e 100644 --- a/ecal/core/include/ecal/ecal_types.h +++ b/ecal/core/include/ecal/ecal_types.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ #pragma once #include +#include namespace eCAL { @@ -47,6 +48,11 @@ namespace eCAL { return !(*this == other); } + + bool operator<(const SDataTypeInformation& rhs) const + { + return std::tie(name, encoding, descriptor) < std::tie(rhs.name, rhs.encoding, rhs.descriptor); + } //!< @endcond }; @@ -69,6 +75,11 @@ namespace eCAL { return !(*this == other); } + + bool operator<(const SServiceMethodInformation& rhs) const + { + return std::tie(request_type, response_type) < std::tie(rhs.request_type, rhs.response_type); + } //!< @endcond }; } diff --git a/ecal/core/include/ecal/msg/capnproto/dynamic.h b/ecal/core/include/ecal/msg/capnproto/dynamic.h index c856afda15..1061768c1c 100644 --- a/ecal/core/include/ecal/msg/capnproto/dynamic.h +++ b/ecal/core/include/ecal/msg/capnproto/dynamic.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,177 +32,67 @@ #pragma warning(pop) #endif /*_MSC_VER*/ -#include -#include +#include #include namespace eCAL { - namespace capnproto + namespace internal { - class CDynamicSubscriber + class CapnprotoDynamicDeserializer { public: - /** - * @brief Constructor. - **/ - CDynamicSubscriber() - : subscriber() - , builder() - , initialized(false) - { - } - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - CDynamicSubscriber(const std::string& topic_name_) - : subscriber(topic_name_, GetDataTypeInformation()) - , builder() - , initialized(false) - { - } - - /** - * @brief Copy Constructor is not available. - **/ - CDynamicSubscriber(const CDynamicSubscriber&) = delete; - - /** - * @brief Copy Constructor is not available. - **/ - CDynamicSubscriber& operator=(const CDynamicSubscriber&) = delete; - - /** - * @brief Move Constructor - **/ - CDynamicSubscriber(CDynamicSubscriber&&) = default; - - /** - * @brief Move assignment - **/ - CDynamicSubscriber& operator=(CDynamicSubscriber&&) = default; - - /** - * @brief eCAL protobuf message receive callback function - * - * @param topic_name_ Topic name of the data source (publisher). - * @param msg_ Protobuf message content. - * @param time_ Message time stamp. - **/ - typedef std::function CapnpDynamicMsgCallbackT; - - /** - * @brief Add callback function for incoming receives. - * - * @param callback_ The callback function to add. - * - * @return True if succeeded, false if not. - **/ - bool AddReceiveCallback(CapnpDynamicMsgCallbackT callback_) - { - msg_callback = callback_; - return subscriber.AddReceiveCallback(std::bind(&CDynamicSubscriber::OnReceive, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); - } - - bool RemReceiveCallback() + // This function is NOT threadsafe!!! + // what about the lifetime of the objects? + // It's totally unclear to me :/ + capnp::DynamicStruct::Reader Deserialize(const void* buffer_, size_t size_, const SDataTypeInformation& datatype_info_) { - auto ret = subscriber.RemReceiveCallback(); - msg_callback = nullptr; - return ret; - } + try + { + // Put the pointer into a capnp::MallocMessageBuilder, it holds the memory to later access the object via a capnp::Dynami + kj::ArrayPtr words = kj::arrayPtr(reinterpret_cast(buffer_), size_ / sizeof(capnp::word)); + kj::ArrayPtr rest = initMessageBuilderFromFlatArrayCopy(words, m_msg_builder); - void OnReceive(const char* topic_name_, const capnp::MallocMessageBuilder& msg_, long long time_, long long clock_, long long id_) - { - if (!initialized) + capnp::Schema schema = GetSchema(datatype_info_); + capnp::DynamicStruct::Builder root_builder = m_msg_builder.getRoot(schema.asStruct()); + return root_builder.asReader(); + } + catch (...) { - SDataTypeInformation topic_info_; - eCAL::Util::GetTopicDataTypeInformation(topic_name_, topic_info_); - std::string topic_desc = topic_info_.descriptor; - if (!topic_desc.empty()) - { - // We initialize the builder from the string - schema = eCAL::capnproto::SchemaFromDescriptor(topic_desc, loader); - initialized = true; - } - else - { - return; - } + throw new DynamicReflectionException("Error deserializing Capnproto data."); } - - auto root = const_cast(msg_).getRoot(schema.asStruct()); - msg_callback(topic_name_, root.asReader(), time_, clock_, id_); } - /** - * @brief get a Pointer to a temporary message that can be passed to receive - **/ - typename capnp::DynamicStruct::Reader getReader() - { - return root_builder.asReader(); - } - - /** - * @brief Manually receive the next sample - **/ - bool Receive(long long* time_ = nullptr, int rcv_timeout_ = 0) - { - return subscriber.Receive(builder, time_, rcv_timeout_); - } - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - bool Create(const std::string& topic_name_) - { - return(subscriber.Create(topic_name_, GetDataTypeInformation())); - } - - /** - * @brief Get type name of the capnp message. - * - * @return Type name. - **/ - ECAL_DEPRECATE_SINCE_5_13("Please use the method SDataTypeInformation GetDataTypeInformation() instead. You can extract the typename from the STopicInformation variable. This function will be removed in eCAL6.") - std::string GetTypeName() const - { - return (""); - } private: - /** - * @brief Get topic information of the message. - * - * @return Topic information. - **/ - SDataTypeInformation GetDataTypeInformation() const + capnp::Schema GetSchema(const SDataTypeInformation& datatype_info_) { - SDataTypeInformation topic_info; - // this is dynamic information. what should we return now? - return topic_info; + auto schema = m_schema_map.find(datatype_info_); + if (schema != m_schema_map.end()) + { + m_schema_map[datatype_info_] = ::eCAL::capnproto::SchemaFromDescriptor(datatype_info_.descriptor, m_loader); + } + return m_schema_map[datatype_info_]; } - - CBuilderSubscriber subscriber; - capnp::MallocMessageBuilder builder; - capnp::DynamicStruct::Builder root_builder; - CapnpDynamicMsgCallbackT msg_callback; - - capnp::schema::Node::Reader reader; - capnp::SchemaLoader loader; - capnp::Schema schema; - - bool initialized; - + capnp::MallocMessageBuilder m_msg_builder; + std::map m_schema_map; + capnp::SchemaLoader m_loader; }; + } + namespace capnproto + { + /** + * @brief eCAL capnp subscriber class. + * + * Subscriber template class for capnp messages. For details see documentation of CSubscriber class. + * + **/ + using CDynamicSubscriber = CDynamicMessageSubscriber; + + /** @example addressbook_rec.cpp + * This is an example how to use eCAL::capnproto::CSubscriber to receive capnp data with eCAL. To receive the data, see @ref addressbook_rec.cpp . + */ } - -} \ No newline at end of file +} diff --git a/ecal/core/include/ecal/msg/capnproto/subscriber.h b/ecal/core/include/ecal/msg/capnproto/subscriber.h index 02cb9edac5..3574773c88 100644 --- a/ecal/core/include/ecal/msg/capnproto/subscriber.h +++ b/ecal/core/include/ecal/msg/capnproto/subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,249 +37,51 @@ #pragma warning(pop) #endif /*_MSC_VER*/ - namespace eCAL { - namespace capnproto + namespace internal { - /** - * @brief eCAL capnp subscriber class. - * - * Subscriber template class for capnp messages. For details see documentation of CSubscriber class. - * - **/ - class CBuilderSubscriber : public CMsgSubscriber + template + class CapnprotoDeserializer { public: - - /** - * @brief Constructor. - **/ - CBuilderSubscriber() : CMsgSubscriber() + SDataTypeInformation GetDataTypeInformation() { + SDataTypeInformation topic_info; + topic_info.encoding = eCAL::capnproto::EncodingAsString(); + topic_info.name = eCAL::capnproto::TypeAsString(); + topic_info.descriptor = eCAL::capnproto::SchemaAsString(); + return topic_info; } - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - CBuilderSubscriber(const std::string& topic_name_, const SDataTypeInformation& topic_info_) : CMsgSubscriber(topic_name_, topic_info_) - { - } - - /** - * @brief Destructor - **/ - ~CBuilderSubscriber() override - { - this->Destroy(); - } - - /** - * @brief Copy Constructor is not available. - **/ - CBuilderSubscriber(const CBuilderSubscriber&) = delete; - - /** - * @brief Copy Constructor is not available. - **/ - CBuilderSubscriber& operator=(const CBuilderSubscriber&) = delete; - - /** - * @brief Move Constructor - **/ - CBuilderSubscriber(CBuilderSubscriber&&) = default; - - /** - * @brief Move assignment - **/ - CBuilderSubscriber& operator=(CBuilderSubscriber&&) = default; - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - bool Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_) - { - return(CMsgSubscriber::Create(topic_name_, topic_info_)); - } + // This function is NOT threadsafe!!! + bool Deserialize(typename T::Reader& msg_, const void* buffer_, size_t size_) + { + kj::ArrayPtr words = kj::arrayPtr(reinterpret_cast(buffer_), size_ / sizeof(capnp::word)); + kj::ArrayPtr rest = initMessageBuilderFromFlatArrayCopy(words, m_msg_builder); + typename T::Builder root_builder = typename T::Builder(m_msg_builder.getRoot()); + msg_ = root_builder.asReader(); - /** - * @brief Deserialize the message object from a message buffer. - * - * @param [out] msg_ The message object. - * @param buffer_ Source buffer. - * @param size_ Source buffer size. - * - * @return True if it succeeds, false if it fails. - **/ - bool Deserialize(capnp::MallocMessageBuilder& msg_, const void* buffer_, size_t size_) const - { - kj::ArrayPtr words = kj::arrayPtr(reinterpret_cast(buffer_), size_ / sizeof(capnp::word)); - kj::ArrayPtr rest = initMessageBuilderFromFlatArrayCopy(words, msg_); return(rest.size() == 0); } + + private: + capnp::MallocMessageBuilder m_msg_builder; }; - /** @example addressbook_rec.cpp - * This is an example how to use eCAL::CCapnpSubscriber to receive capnp data with eCAL. To send the data, see @ref addressbook_snd.cpp . - */ + } + namespace capnproto + { /** - * @brief eCAL capnp subscriber class. - * - * Subscriber template class for capnp messages. For details see documentation of CSubscriber class. - * + * @brief eCAL capnp subscriber class. + * + * Subscriber template class for capnp messages. For details see documentation of CSubscriber class. + * **/ - template - class CSubscriber - { - public: - /** - * @brief Constructor. - **/ - CSubscriber() - : subscriber() - , builder() - , root_builder(builder.getRoot()) - { - } - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - CSubscriber(const std::string& topic_name_) - : subscriber(topic_name_, GetDataTypeInformation()) - , builder() - , root_builder(builder.getRoot()) - { - } - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber(const CSubscriber&) = delete; - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber& operator=(const CSubscriber&) = delete; - - /** - * @brief Move Constructor - **/ - CSubscriber(CSubscriber&&) = default; - - /** - * @brief Move assignment - **/ - CSubscriber& operator=(CSubscriber&&) = default; - - /** - * @brief eCAL protobuf message receive callback function - * - * @param topic_name_ Topic name of the data source (publisher). - * @param msg_ Protobuf message content. - * @param time_ Message time stamp. - **/ - typedef std::function MsgCallbackT; - - /** - * @brief Add callback function for incoming receives. - * - * @param callback_ The callback function to add. - * - * @return True if succeeded, false if not. - **/ - bool AddReceiveCallback(MsgCallbackT callback_) - { - msg_callback = callback_; - return subscriber.AddReceiveCallback(std::bind(&CSubscriber::OnReceive, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); - } - - bool RemReceiveCallback() - { - auto ret{subscriber.RemReceiveCallback()}; - msg_callback = nullptr; - return ret; - } - - void OnReceive(const char* topic_name_, const capnp::MallocMessageBuilder& msg_, long long time_, long long clock_, long long id_) - { - auto root = const_cast(msg_).getRoot(); - msg_callback(topic_name_, root, time_, clock_, id_); - } - - /** - * @brief get a Pointer to a temporary message that can be passed to receive - **/ - typename message_type::Reader getReader() - { - return root_builder.asReader(); - } - - /** - * @brief Manually receive the next sample - **/ - bool Receive(long long* time_ = nullptr, int rcv_timeout_ = 0) - { - bool success = subscriber.Receive(builder, time_, rcv_timeout_); - // Update the Reader - root_builder = typename message_type::Builder(builder.getRoot()); - return success; - } - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - bool Create(const std::string& topic_name_) - { - return(subscriber.Create(topic_name_, GetDataTypeInformation())); - } - - /** - * @brief Get type name of the capnp message. - * - * @return Type name. - **/ - ECAL_DEPRECATE_SINCE_5_13("Please use SDataTypeInformation GetDataTypeInformation() instead. This function will be removed in eCAL6.") - std::string GetTypeName() const - { - return eCAL::capnproto::TypeAsString(); - } + template + using CSubscriber = CMessageSubscriber>; - private: - /** - * @brief Get topic information of the message. - * - * @return Topic information. - **/ - SDataTypeInformation GetDataTypeInformation() const - { - SDataTypeInformation topic_info; - topic_info.encoding = eCAL::capnproto::EncodingAsString(); - topic_info.name = eCAL::capnproto::TypeAsString(); - topic_info.descriptor = eCAL::capnproto::SchemaAsString(); - return topic_info; - } - - CBuilderSubscriber subscriber; - capnp::MallocMessageBuilder builder; - typename message_type::Builder root_builder; - MsgCallbackT msg_callback; - - - }; /** @example addressbook_rec.cpp * This is an example how to use eCAL::capnproto::CSubscriber to receive capnp data with eCAL. To receive the data, see @ref addressbook_rec.cpp . */ diff --git a/ecal/core/include/ecal/msg/dynamic.h b/ecal/core/include/ecal/msg/dynamic.h index d84721b325..78603f594f 100644 --- a/ecal/core/include/ecal/msg/dynamic.h +++ b/ecal/core/include/ecal/msg/dynamic.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,12 +24,16 @@ #pragma once -#include #include +#include +#include +#include + +#include +#include namespace eCAL { - /* @cond */ class DynamicReflectionException : public std::exception { @@ -40,7 +44,7 @@ namespace eCAL std::string message_; }; /* @endcond */ - + /* @cond */ inline bool StrEmptyOrNull(const std::string& str) { @@ -61,4 +65,263 @@ namespace eCAL } } /* @endcond */ + + + template + class CDynamicMessageSubscriber final : public CSubscriber + { + public: + /** + * @brief Constructor. + **/ + CDynamicMessageSubscriber() : CSubscriber() + { + } + + /** + * @brief Constructor. + * + * @param topic_name_ Unique topic name. + **/ + CDynamicMessageSubscriber(const std::string& topic_name_) : CSubscriber() + , m_topic_name(topic_name_) + , m_deserializer() + { + CSubscriber::Create(topic_name_); + } + + ~CDynamicMessageSubscriber() noexcept + { + Destroy(); + }; + + /** + * @brief Copy Constructor is not available. + **/ + CDynamicMessageSubscriber(const CDynamicMessageSubscriber&) = delete; + + /** + * @brief Copy Constructor is not available. + **/ + CDynamicMessageSubscriber& operator=(const CDynamicMessageSubscriber&) = delete; + + /** + * @brief Move Constructor + **/ + CDynamicMessageSubscriber(CDynamicMessageSubscriber&& rhs) + : CSubscriber(std::move(rhs)) + , m_topic_name(std::move(rhs.m_topic_name)) + , m_cb_callback(std::move(rhs.m_cb_callback)) + , m_deserializer(std::move(rhs.m_deserializer)) + { + bool has_callback = (m_cb_callback != nullptr); + + if (has_callback) + { + // the callback bound to the CSubscriber belongs to rhs, bind to this callback instead + CSubscriber::RemReceiveCallback(); + auto callback = std::bind(&CDynamicMessageSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2); + CSubscriber::AddReceiveCallback(callback); + } + } + + /** + * @brief Move assignment not available + **/ + CDynamicMessageSubscriber& operator=(CDynamicMessageSubscriber&& rhs) = delete; + + /** + * @brief Destroys this object. + * + * @return true if it succeeds, false if it fails. + **/ + bool Destroy() + { + RemReceiveCallback(); + return(CSubscriber::Destroy()); + } + + /** + * @brief Receive deserialized message. + * + * @param [out] time_ Optional receive time stamp. + * @param rcv_timeout_ Receive timeout in ms. + * + * @return std::optional which holds the value if a value could be received, and std::nullopt if it couldn't. + **/ + // Do we want to call error callbacks on receive? Probably not! std::expected wouuld be a good thing to return the reason why things went wrong. + std::optional Receive(long long* time_ = nullptr, int rcv_timeout_ = 0) + { + std::string rec_buf; + bool success = CSubscriber::ReceiveBuffer(rec_buf, time_, rcv_timeout_); + if (!success) + { + return std::nullopt; + } + // In the future, I would like to get m_datatype_info from the ReceiveBuffer function! + + PopulateDatatypeInfo(); + // We can't possibly receive anything if we don't have datatype info available + if (!m_datatype_info_received) + { + return std::nullopt; + } + + try + { + return(m_deserializer.Deserialize(rec_buf.c_str(), rec_buf.size(), m_datatype_info_received.value())); + } + catch (const DynamicReflectionException& /*e*/) + { + return std::nullopt; + } + } + + /** + * @brief eCAL message receive callback function + * + * @param topic_name_ Topic name of the data source (publisher). + * @param msg_ Message content. + * @param time_ Message time stamp. + * @param clock_ Message writer clock. + * @param id_ Message id. + **/ + using MsgReceiveCallbackT = std::function; + + /** + * @brief Add receive callback for incoming messages. + * + * @param callback_ The callback function. + * + * @return True if it succeeds, false if it fails. + **/ + bool AddReceiveCallback(MsgReceiveCallbackT callback_) + { + RemReceiveCallback(); + + { + std::lock_guard callback_lock(m_cb_callback_mutex); + m_cb_callback = callback_; + } + auto callback = std::bind(&CDynamicMessageSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2); + return(CSubscriber::AddReceiveCallback(callback)); + } + + /** + * @brief Remove receive callback for incoming messages. + * + * @return True if it succeeds, false if it fails. + **/ + bool RemReceiveCallback() + { + bool ret = CSubscriber::RemReceiveCallback(); + + std::lock_guard callback_lock(m_cb_callback_mutex); + if (m_cb_callback == nullptr) return(false); + m_cb_callback = nullptr; + return(ret); + } + + /** + * @brief Callback function in case an error occurs. + * + * @param error The error message string. + **/ + using ErrorCallbackT = std::function; + + /** + * @brief Add callback function in case an error occurs. + * + * @param callback_ The callback function to add. + * + * @return True if succeeded, false if not. + **/ + bool AddErrorCallback(ErrorCallbackT callback_) + { + std::lock_guard callback_lock(m_error_callback_mutex); + m_error_callback = callback_; + + return true; + } + + /** + * @brief Remove callback function in case an error occurs. + * + * @return True if succeeded, false if not. + **/ + bool RemErrorCallback() + { + std::lock_guard callback_lock(m_error_callback_mutex); + m_error_callback = nullptr; + + return true; + } + + private: + void ReceiveCallback(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_) + { + MsgReceiveCallbackT fn_callback = nullptr; + { + std::lock_guard callback_lock(m_cb_callback_mutex); + fn_callback = m_cb_callback; + } + + if (fn_callback == nullptr) return; + + PopulateDatatypeInfo(); + + if (!m_datatype_info_received) + { + CallErrorCallback("Dynamic Deserialization: No Prototype available."); + return; + } + + try + { + // In the future, I would like to get m_datatype_info from the ReceiveBuffer function! + auto msg = m_deserializer.Deserialize(data_->buf, data_->size, m_datatype_info_received.value()); + fn_callback(topic_name_, msg, data_->time, data_->clock, data_->id); + } + catch (const DynamicReflectionException& e) + { + CallErrorCallback(std::string("Dynamic Deserialization: Error deserializing data: ") + e.what() ); + } + } + + void PopulateDatatypeInfo() + { + if (!m_datatype_info_received) + { + SDataTypeInformation datatype_info_received; + auto received_info = eCAL::Util::GetTopicDataTypeInformation(m_topic_name, datatype_info_received); + // empty datatype informations are not valid to do reflection on! + if (received_info && datatype_info_received != SDataTypeInformation{}) + { + m_datatype_info_received = datatype_info_received; + } + } + } + + void CallErrorCallback(const std::string& message) + { + ErrorCallbackT error_callback = nullptr; + { + std::lock_guard callback_lock(m_error_callback_mutex); + error_callback = m_error_callback; + } + if (error_callback) + { + error_callback(message); + } + } + + std::string m_topic_name; + std::mutex m_cb_callback_mutex; + MsgReceiveCallbackT m_cb_callback; + std::mutex m_error_callback_mutex; + ErrorCallbackT m_error_callback; + DynamicDeserializer m_deserializer; + std::optional m_datatype_info_received = std::nullopt; + }; + } diff --git a/ecal/core/include/ecal/msg/flatbuffers/subscriber.h b/ecal/core/include/ecal/msg/flatbuffers/subscriber.h index 0b274b2f39..714a8c8075 100644 --- a/ecal/core/include/ecal/msg/flatbuffers/subscriber.h +++ b/ecal/core/include/ecal/msg/flatbuffers/subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,81 +28,13 @@ namespace eCAL { - namespace flatbuffers + namespace internal { - /** - * @brief eCAL google::flatbuffers subscriber class. - * - * Subscriber template class for google::flatbuffers messages. For details see documentation of CSubscriber class. - * - **/ template - class CSubscriber : public CMsgSubscriber + class FlatbuffersDeserializer { public: - /** - * @brief Constructor. - **/ - CSubscriber() : CMsgSubscriber() - { - } - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - CSubscriber(const std::string& topic_name_) : CMsgSubscriber(topic_name_, CSubscriber::GetDataTypeInformation()) - { - } - - /** - * @brief Destructor - **/ - ~CSubscriber() override - { - this->Destroy(); - } - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber(const CSubscriber&) = delete; - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber& operator=(const CSubscriber&) = delete; - - /** - * @brief Move Constructor - **/ - CSubscriber(CSubscriber&&) = default; - - /** - * @brief Move assignment - **/ - CSubscriber& operator=(CSubscriber&&) = default; - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - bool Create(const std::string& topic_name_) - { - return(CMsgSubscriber::Create(topic_name_, GetDataTypeInformation())); - } - - private: - /** - * @brief Get topic information of the flatbuffers message. - * - * @return Topic information. - **/ - SDataTypeInformation GetDataTypeInformation() const override + static SDataTypeInformation GetDataTypeInformation() { SDataTypeInformation topic_info; topic_info.encoding = "flatb"; @@ -110,23 +42,28 @@ namespace eCAL return topic_info; } - /** - * @brief Deserialize the message object from a message buffer. - * - * @param [out] msg_ The message object. - * @param buffer_ Source buffer. - * @param size_ Source buffer size. - * - * @return True if it succeeds, false if it fails. - **/ - bool Deserialize(T& msg_, const void* buffer_, size_t size_) const + static bool Deserialize(T& msg_, const void* buffer_, size_t size_) { msg_.PushFlatBuffer(static_cast(buffer_), static_cast(size_)); return(true); } }; + } + + namespace flatbuffers + { + + /** + * @brief eCAL google::flatbuffers subscriber class. + * + * Subscriber template class for google::flatbuffers messages. For details see documentation of CSubscriber class. + * + **/ + template + using CSubscriber = CMessageSubscriber>; + /** @example monster_rec.cpp * This is an example how to use eCAL::CSubscriber to receive goggle::flatbuffers data with eCAL. To send the data, see @ref monster_snd.cpp . */ } -} +} \ No newline at end of file diff --git a/ecal/core/include/ecal/msg/messagepack/subscriber.h b/ecal/core/include/ecal/msg/messagepack/subscriber.h index ece4e0d4ef..dc72be7ab5 100644 --- a/ecal/core/include/ecal/msg/messagepack/subscriber.h +++ b/ecal/core/include/ecal/msg/messagepack/subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,81 +31,13 @@ namespace eCAL { - namespace messagepack + namespace internal { - /** - * @brief eCAL msgpack subscriber class. - * - * Subscriber template class for msgpack messages. For details see documentation of CSubscriber class. - * - **/ template - class CSubscriber : public CMsgSubscriber + class MessagePackDeserializer { public: - /** - * @brief Constructor. - **/ - CSubscriber() : CMsgSubscriber() - { - } - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - CSubscriber(const std::string& topic_name_) : CMsgSubscriber(topic_name_, GetDataTypeInformation()) - { - } - - /** - * @brief Destructor - **/ - ~CSubscriber() override - { - this->Destroy(); - } - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber(const CSubscriber&) = delete; - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber& operator=(const CSubscriber&) = delete; - - /** - * @brief Move Constructor - **/ - CSubscriber(CSubscriber&&) = default; - - /** - * @brief Move assignment - **/ - CSubscriber& operator=(CSubscriber&&) = default; - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - bool Create(const std::string& topic_name_) - { - return(CMsgSubscriber::Create(topic_name_, GetDataTypeInformation())); - } - - private: - /** - * @brief Get topic information of the message. - * - * @return Topic information. - **/ - SDataTypeInformation GetDataTypeInformation() const override + static SDataTypeInformation GetDataTypeInformation() { SDataTypeInformation topic_info; topic_info.encoding = "mpack"; @@ -113,16 +45,7 @@ namespace eCAL return topic_info; } - /** - * @brief Deserialize the message object from a message buffer. - * - * @param [out] msg_ The message object. - * @param buffer_ Source buffer. - * @param size_ Source buffer size. - * - * @return True if it succeeds, false if it fails. - **/ - bool Deserialize(T& msg_, const void* buffer_, size_t size_) const + static bool Deserialize(T& msg_, const void* buffer_, size_t size_, const SDataTypeInformation& /*datatype_info_*/) { msgpack::unpacked ubuffer; msgpack::unpack(ubuffer, static_cast(buffer_), size_); @@ -131,8 +54,23 @@ namespace eCAL return(true); } }; + } + + namespace messagepack + { + + /** + * @brief eCAL msgpack subscriber class. + * + * Subscriber template class for msgpack messages. For details see documentation of CSubscriber class. + * + **/ + template + using CSubscriber = CMessageSubscriber>; + /** @example address_rec.cpp * This is an example how to use eCAL::CSubscriber to receive msgpack data with eCAL. To send the data, see @ref address_snd.cpp . */ } } + diff --git a/ecal/core/include/ecal/msg/protobuf/client.h b/ecal/core/include/ecal/msg/protobuf/client.h index 66c4c7db0f..b98606e33c 100644 --- a/ecal/core/include/ecal/msg/protobuf/client.h +++ b/ecal/core/include/ecal/msg/protobuf/client.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,11 +26,8 @@ #include #include -#include #include - - // protobuf includes #ifdef _MSC_VER #pragma warning(push, 0) // disable proto warnings diff --git a/ecal/core/include/ecal/msg/protobuf/dynamic_subscriber.h b/ecal/core/include/ecal/msg/protobuf/dynamic_subscriber.h index 032536e89a..b95d766bb6 100644 --- a/ecal/core/include/ecal/msg/protobuf/dynamic_subscriber.h +++ b/ecal/core/include/ecal/msg/protobuf/dynamic_subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,306 +46,88 @@ namespace eCAL { - namespace protobuf - { - /** - * @brief eCAL dynamic protobuf subscriber. - **/ - class CDynamicSubscriber + namespace internal { - public: - /** - * @brief Constructor. - **/ - CDynamicSubscriber(); - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - CDynamicSubscriber(const std::string& topic_name_); - - /** - * @brief Destructor. - **/ - virtual ~CDynamicSubscriber(); - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return true if it succeeds, false if it fails. - **/ - void Create(const std::string& topic_name_); - - /** - * @brief Destroys this object. - * - * @return true if it succeeds, false if it fails. - **/ - void Destroy(); - - /** - * @brief Query if this object is created. - * - * @return true if created, false if not. - **/ - bool IsCreated() { return(created); } - - /** - * @brief eCAL protobuf message receive callback function - * - * @param topic_name_ Topic name of the data source (publisher). - * @param msg_ Protobuf message content. - * @param time_ Message time stamp. - **/ - typedef std::function ProtoMsgCallbackT; - - /** - * @brief Add callback function for incoming receives. - * - * @param callback_ The callback function to add. - * - * @return True if succeeded, false if not. - **/ - bool AddReceiveCallback(ProtoMsgCallbackT callback_); - - /** - * @brief get a Pointer to a temporary message that can be passed to receive - * - **/ - google::protobuf::Message* getMessagePointer(); - - /** - * @brief Manually receive the next sample - **/ - bool Receive(google::protobuf::Message& msg_, long long* time_ = nullptr, int rcv_timeout_ = 0); - - /** - * @brief Remove callback function for incoming receives. - * - * @return True if succeeded, false if not. - **/ - bool RemReceiveCallback(); - - /** - * @brief Callback function in case an error occurs. - * - * @param error The error message string. - **/ - typedef std::function ProtoErrorCallbackT; - - /** - * @brief Add callback function in case an error occurs. - * - * @param callback_ The callback function to add. - * - * @return True if succeeded, false if not. - **/ - bool AddErrorCallback(ProtoErrorCallbackT callback_); - - /** - * @brief Remove callback function in case an error occurs. - * - * @return True if succeeded, false if not. - **/ - bool RemErrorCallback(); - - protected: - void OnReceive(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_); - - std::shared_ptr CreateMessagePointer(const std::string& topic_name_); - - bool created; - std::string topic_name; - std::unique_ptr msg_decoder; - std::shared_ptr msg_ptr; - eCAL::CSubscriber msg_sub; - ProtoMsgCallbackT msg_callback; - ProtoErrorCallbackT err_callback; - - private: - // this object must not be copied. - CDynamicSubscriber(const CDynamicSubscriber&); - CDynamicSubscriber& operator=(const CDynamicSubscriber&); - }; - /** @example proto_dyn.cpp - * This is an example how to use CDynamicSubscriber to receive dynamic google::protobuf data with eCAL. - **/ - - inline CDynamicSubscriber::CDynamicSubscriber() : - created(false), - msg_decoder(nullptr) - { - } - - inline CDynamicSubscriber::CDynamicSubscriber(const std::string& topic_name_) : - created(false), - msg_decoder(nullptr) - { - Create(topic_name_); - } - - inline CDynamicSubscriber::~CDynamicSubscriber() - { - Destroy(); - } - - inline void CDynamicSubscriber::Create(const std::string& topic_name_) - { - if (created) return; - - // save the topic name (required for receive polling) - topic_name = topic_name_; - - // create message decoder - msg_decoder = std::make_unique(); - - // create subscriber - msg_sub.Create(topic_name_); - - created = true; - } - - inline void CDynamicSubscriber::Destroy() - { - if (!created) return; - - // destroy subscriber - msg_sub.Destroy(); - - // delete message pointer - msg_ptr = nullptr; - - // delete message decoder - msg_decoder.reset(); - - created = false; - } - - inline google::protobuf::Message* CDynamicSubscriber::getMessagePointer() - { - try + class ProtobufDynamicDeserializer { - // Create Message Pointer for our topic name. - if (msg_ptr == nullptr) + public: + std::shared_ptr Deserialize(const void* buffer_, size_t size_, const SDataTypeInformation& datatype_info_) { - msg_ptr = CreateMessagePointer(topic_name); - } - } - catch (DynamicReflectionException& /*e*/) - { - return nullptr; - } - return msg_ptr.get(); - } - - inline bool CDynamicSubscriber::Receive(google::protobuf::Message& msg_, long long* time_, int rcv_timeout_) - { - std::string rec_buf; - bool success = msg_sub.ReceiveBuffer(rec_buf, time_, rcv_timeout_); - if (!success) return(false); - - // Parse current message content - auto parsed = msg_.ParseFromString(rec_buf); - return parsed; - } - - inline bool CDynamicSubscriber::AddReceiveCallback(ProtoMsgCallbackT callback_) - { - msg_callback = callback_; - msg_sub.AddReceiveCallback(std::bind(&CDynamicSubscriber::OnReceive, this, std::placeholders::_1, std::placeholders::_2)); - return(true); - } - - inline bool CDynamicSubscriber::RemReceiveCallback() - { - msg_sub.RemReceiveCallback(); - msg_callback = nullptr; - return(true); - } + auto message_prototype = GetMessagePointer(datatype_info_); + // for some reason cannot use std::make_shared, however should be ok in this context. + std::shared_ptr message_with_content(message_prototype->New()); + message_with_content->CopyFrom(*message_prototype); - inline bool CDynamicSubscriber::AddErrorCallback(ProtoErrorCallbackT callback_) - { - err_callback = callback_; - return(true); - } - - inline bool CDynamicSubscriber::RemErrorCallback() - { - err_callback = nullptr; - return(true); - } + try + { + message_with_content->ParseFromArray(buffer_, (int)size_); + return message_with_content; + } + catch (...) + { + throw new DynamicReflectionException("Error deserializing Protobuf data."); + } + } - inline void CDynamicSubscriber::OnReceive(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_) - { - try - { - // Create Message Pointer for our topic name. - if (msg_ptr == nullptr) + private: + std::shared_ptr GetMessagePointer(const SDataTypeInformation& datatype_info_) { - msg_ptr = CreateMessagePointer(topic_name_); + auto schema = m_message_map.find(datatype_info_); + if (schema == m_message_map.end()) + { + m_message_map[datatype_info_] = CreateMessagePointer(datatype_info_); + } + return m_message_map[datatype_info_]; } - if (msg_callback && msg_ptr) + std::shared_ptr CreateMessagePointer(const SDataTypeInformation& topic_info_) { - // Parse current message content - auto parsed = msg_ptr->ParseFromArray(data_->buf, data_->size); - if (parsed) + // get topic type + std::string topic_type{ topic_info_.name }; + topic_type = topic_type.substr(topic_type.find_last_of('.') + 1, topic_type.size()); + if (StrEmptyOrNull(topic_type)) { - msg_callback(topic_name_, *msg_ptr, data_->time); + throw DynamicReflectionException("ProtobufDynamicDeserializer: Could not get type"); } - else + + std::string topic_desc = topic_info_.descriptor; + if (StrEmptyOrNull(topic_desc)) { - throw DynamicReflectionException("CDynamicSubscriber: DataContent could not be parsed"); + throw DynamicReflectionException("ProtobufDynamicDeserializer: Could not get description for type" + std::string(topic_type)); } + + google::protobuf::FileDescriptorSet proto_desc; + proto_desc.ParseFromString(topic_desc); + std::string error_s; + std::shared_ptr proto_msg_ptr(m_dynamic_decoder.GetProtoMessageFromDescriptorSet(proto_desc, topic_type, error_s)); + if (proto_msg_ptr == nullptr) + { + std::stringstream s; + s << "ProtobufDynamicDeserializer: Message of type " + std::string(topic_type) << " could not be decoded" << std::endl; + s << error_s; + throw DynamicReflectionException(s.str()); + } + + return proto_msg_ptr; } - } - catch (DynamicReflectionException& e) - { - if (err_callback) - { - err_callback(e.what()); - } - } + + eCAL::protobuf::CProtoDynDecoder m_dynamic_decoder; + std::map> m_message_map; + }; } - /** - * Might throw DynamicReflectionException! - **/ - inline std::shared_ptr CDynamicSubscriber::CreateMessagePointer(const std::string& topic_name_) + namespace protobuf { - // get topic type - SDataTypeInformation topic_info; - eCAL::Util::GetTopicDataTypeInformation(topic_name, topic_info); - std::string topic_type{ topic_info.name }; - topic_type = topic_type.substr(topic_type.find_last_of('.') + 1, topic_type.size()); - if (StrEmptyOrNull(topic_type)) - { - throw DynamicReflectionException("CDynamicSubscriber: Could not get type for topic " + std::string(topic_name_)); - } - - std::string topic_desc = topic_info.descriptor; - if (StrEmptyOrNull(topic_desc)) - { - throw DynamicReflectionException("CDynamicSubscriber: Could not get description for topic " + std::string(topic_name_)); - } - - google::protobuf::FileDescriptorSet proto_desc; - proto_desc.ParseFromString(topic_desc); - std::string error_s; - std::shared_ptr proto_msg_ptr(msg_decoder->GetProtoMessageFromDescriptorSet(proto_desc, topic_type, error_s)); - if (proto_msg_ptr == nullptr) - { - std::stringstream s; - s << "CDynamicSubscriber: Message of type " + std::string(topic_name_) << " could not be decoded" << std::endl; - s << error_s; - throw DynamicReflectionException(s.str()); - } + /** + * @brief eCAL protobuf dynamic subscriber class. + * + * Dynamic subscriber class for protobuf messages. For details see documentation of CDynamicMessageSubscriber class. + * + **/ + using CDynamicSubscriber = CDynamicMessageSubscriber, internal::ProtobufDynamicDeserializer>; - return proto_msg_ptr; + /** @example proto_dyn_rec.cpp + * This is an example how to use eCAL::protobuf::CDynamicSubscriber to receive dynamic protobuf data with eCAL. To receive the data, see @ref proto_dyn_rec.cpp . + */ } - } } diff --git a/ecal/core/include/ecal/msg/protobuf/server.h b/ecal/core/include/ecal/msg/protobuf/server.h index cb0eb87a73..241bbe992c 100644 --- a/ecal/core/include/ecal/msg/protobuf/server.h +++ b/ecal/core/include/ecal/msg/protobuf/server.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ #pragma once #include -#include #include #include diff --git a/ecal/core/include/ecal/msg/protobuf/subscriber.h b/ecal/core/include/ecal/msg/protobuf/subscriber.h index b176805a47..cc8acd8c3c 100644 --- a/ecal/core/include/ecal/msg/protobuf/subscriber.h +++ b/ecal/core/include/ecal/msg/protobuf/subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,104 +44,23 @@ namespace eCAL { - namespace protobuf + namespace internal { - /** - * @brief eCAL google::protobuf subscriber class. - * - * Subscriber template class for google::protobuf messages. For details see documentation of CSubscriber class. - * - **/ template - class CSubscriber : public CMsgSubscriber + class ProtobufDeserializer { public: - /** - * @brief Constructor. - **/ - CSubscriber() : CMsgSubscriber() - { - } - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - - // call the function via its class because it's a virtual function that is called in constructor/destructor,- - // where the vtable is not created yet, or it's destructed. - explicit CSubscriber(const std::string& topic_name_) : CMsgSubscriber(topic_name_, CSubscriber::GetDataTypeInformation()) - { - } - - /** - * @brief Destructor - **/ - ~CSubscriber() override - { - this->Destroy(); - } - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber(const CSubscriber&) = delete; - - /** - * @brief Copy Assignment is not available. - **/ - CSubscriber& operator=(const CSubscriber&) = delete; - - /** - * @brief Move Constructor - **/ - CSubscriber(CSubscriber&&) = default; - - /** - * @brief Move assignment - **/ - CSubscriber& operator=(CSubscriber&&) = default; - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - bool Create(const std::string& topic_name_) - { - return(CMsgSubscriber::Create(topic_name_, GetDataTypeInformation())); - } - - private: - /** - * @brief Get topic information of the protobuf message. - * - * @return Topic information. - **/ - SDataTypeInformation GetDataTypeInformation() const override + static SDataTypeInformation GetDataTypeInformation() { SDataTypeInformation topic_info; static T msg{}; - topic_info.encoding = "proto"; - topic_info.name = msg.GetTypeName(); + topic_info.encoding = "proto"; + topic_info.name = msg.GetTypeName(); topic_info.descriptor = protobuf::GetProtoMessageDescription(msg); return topic_info; } - - /** - * @brief Deserialize the message object from a message buffer. - * - * @param [out] msg_ The message object. - * @param buffer_ Source buffer. - * @param size_ Source buffer size. - * - * @return True if it succeeds, false if it fails. - **/ - bool Deserialize(T& msg_, const void* buffer_, size_t size_) const override + static bool Deserialize(T& msg_, const void* buffer_, size_t size_) { // we try to parse the message from the received buffer if (msg_.ParseFromArray(buffer_, static_cast(size_))) @@ -150,10 +69,24 @@ namespace eCAL } return(false); } - }; - /** @example person_rec.cpp - * This is an example how to use eCAL::CSubscriber to receive google::protobuf data with eCAL. To send the data, see @ref person_snd.cpp . + } + + namespace protobuf + { + + /** + * @brief eCAL google::protobuf subscriber class. + * + * Subscriber template class for google::protobuf messages. For details see documentation of CSubscriber class. + * **/ + template + using CSubscriber = CMessageSubscriber>; + + /** @example person_rec.cpp + * This is an example how to use eCAL::CSubscriber to receive google::protobuf data with eCAL. To send the data, see @ref person_snd.cpp . + */ } } + diff --git a/ecal/core/include/ecal/msg/string/subscriber.h b/ecal/core/include/ecal/msg/string/subscriber.h index 949acf37b0..0747cf977a 100644 --- a/ecal/core/include/ecal/msg/string/subscriber.h +++ b/ecal/core/include/ecal/msg/string/subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,84 +32,13 @@ namespace eCAL { - namespace string + namespace internal { - /** - * @brief eCAL subscriber class for standard strings. - * - * Subscriber class for STL standard strings. For details see documentation of CSubscriber class. - * - **/ template - class CSubscriber : public CMsgSubscriber + class StringDeserializer { public: - /** - * @brief Constructor. - **/ - CSubscriber() : CMsgSubscriber() - { - } - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - **/ - - // call the function via its class because it's a virtual function that is called in constructor/destructor,- - // where the vtable is not created yet, or it's destructed. - explicit CSubscriber(const std::string& topic_name_) : CMsgSubscriber(topic_name_, CSubscriber::GetDataTypeInformation()) - { - } - - /** - * @brief Destructor - **/ - ~CSubscriber() override - { - this->Destroy(); - } - - /** - * @brief Copy Constructor is not available. - **/ - CSubscriber(const CSubscriber&) = delete; - - /** - * @brief Copy Assignment is not available. - **/ - CSubscriber& operator=(const CSubscriber&) = delete; - - /** - * @brief Move Constructor - **/ - CSubscriber(CSubscriber&&) = default; - - /** - * @brief Move assignment - **/ - CSubscriber& operator=(CSubscriber&&) = default; - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - bool Create(const std::string& topic_name_) - { - return(CMsgSubscriber::Create(topic_name_, GetDataTypeInformation())); - } - - private: - /** - * @brief Get topic information of the protobuf message. - * - * @return Topic information. ("base", "std::string", "") - **/ - SDataTypeInformation GetDataTypeInformation() const override + static SDataTypeInformation GetDataTypeInformation() { SDataTypeInformation topic_info; topic_info.encoding = "base"; @@ -118,24 +47,28 @@ namespace eCAL return topic_info; } - /** - * @brief Copy the string object from char buffer. - * - * @param [out] msg_ The message object. - * @param buffer_ Target buffer. - * @param size_ Target buffer size. - * - * @return True if it succeeds, false if it fails. - **/ - bool Deserialize(T& msg_, const void* buffer_, size_t size_) const override + static bool Deserialize(T& msg_, const void* buffer_, size_t size_) { msg_ = std::string(static_cast(buffer_), size_); return true; } - }; - /** @example minimal_rec.cpp - * This is an example how to use eCAL::CSubscriber to receive a std::string with eCAL. To send the strings, see @ref minimal_snd.cpp . + } + + namespace string + { + + /** + * @brief eCAL subscriber class for standard strings. + * + * Subscriber class for STL standard strings. For details see documentation of CSubscriber class. + * **/ + template + using CSubscriber = CMessageSubscriber>; + + /** @example minimal_rec.cpp + * This is an example how to use eCAL::CSubscriber to receive a std::string with eCAL. To send the strings, see @ref minimal_snd.cpp . + */ } } diff --git a/ecal/core/include/ecal/msg/subscriber.h b/ecal/core/include/ecal/msg/subscriber.h index 8322bdf87c..b59d926daa 100644 --- a/ecal/core/include/ecal/msg/subscriber.h +++ b/ecal/core/include/ecal/msg/subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ namespace eCAL * **/ template - class CMsgSubscriber : public CSubscriber + class [[deprecated("Please use CMessageSubscriber instead")]] CMsgSubscriber : public CSubscriber { public: /** @@ -229,4 +229,196 @@ namespace eCAL std::mutex m_cb_callback_mutex; MsgReceiveCallbackT m_cb_callback; }; + + /** + * @brief eCAL abstract message subscriber class. + * + * Abstract subscriber template class for messages. + * This class has two template arguments, the actual type and a deserializer class. + * The deserializer class is responsible for providing datatype information, and providing a method to convert from `void*` to `T`. + * This allows to specify classes with common deserializers, e.g. like a ProtobufMessageSubscriber, StringMessageSubscriber etc. + * + **/ + template + class CMessageSubscriber final : public CSubscriber + { + public: + /** + * @brief Constructor. + **/ + CMessageSubscriber() : CSubscriber() + { + } + + /** + * @brief Constructor. + * + * @param topic_name_ Unique topic name. + **/ + CMessageSubscriber(const std::string& topic_name_) : CSubscriber() + , m_deserializer() + { + SDataTypeInformation topic_info = m_deserializer.GetDataTypeInformation(); + CSubscriber::Create(topic_name_, topic_info); + } + + ~CMessageSubscriber() noexcept + { + Destroy(); + }; + + /** + * @brief Copy Constructor is not available. + **/ + CMessageSubscriber(const CMessageSubscriber&) = delete; + + /** + * @brief Copy Constructor is not available. + **/ + CMessageSubscriber& operator=(const CMessageSubscriber&) = delete; + + /** + * @brief Move Constructor + **/ + CMessageSubscriber(CMessageSubscriber&& rhs) + : CSubscriber(std::move(rhs)) + , m_cb_callback(std::move(rhs.m_cb_callback)) + , m_deserializer(std::move(rhs.m_deserializer)) + { + bool has_callback = (m_cb_callback != nullptr); + + if (has_callback) + { + // the callback bound to the CSubscriber belongs to rhs, bind to this callback instead + CSubscriber::RemReceiveCallback(); + auto callback = std::bind(&CMessageSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2); + CSubscriber::AddReceiveCallback(callback); + } + } + + /** + * @brief Move assignment + **/ + CMessageSubscriber& operator=(CMessageSubscriber&& rhs) + { + Destroy(); + + CSubscriber::operator=(std::move(rhs)); + + m_cb_callback = std::move(rhs.m_cb_callback); + m_deserializer = std::move(rhs.m_deserializer); + + bool has_callback(m_cb_callback != nullptr); + + if (has_callback) + { + // the callback bound to the CSubscriber belongs to rhs, bind to this callback instead; + CSubscriber::RemReceiveCallback(); + auto callback = std::bind(&CMessageSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2); + CSubscriber::AddReceiveCallback(callback); + } + + return *this; + } + + /** + * @brief Destroys this object. + * + * @return true if it succeeds, false if it fails. + **/ + bool Destroy() + { + RemReceiveCallback(); + return(CSubscriber::Destroy()); + } + + /** + * @brief Receive deserialized message. + * + * @param [out] msg_ The message object. + * @param [out] time_ Optional receive time stamp. + * @param rcv_timeout_ Receive timeout in ms. + * + * @return True if a message could received, false otherwise. + **/ + bool Receive(T& msg_, long long* time_ = nullptr, int rcv_timeout_ = 0) + { + std::string rec_buf; + bool success = CSubscriber::ReceiveBuffer(rec_buf, time_, rcv_timeout_); + if (!success) return(false); + // In the future, I would like to get m_datatype_info from the ReceiveBuffer fuunction! + return(m_deserializer.Deserialize(msg_, rec_buf.c_str(), rec_buf.size())); + } + + /** + * @brief eCAL message receive callback function + * + * @param topic_name_ Topic name of the data source (publisher). + * @param msg_ Message content. + * @param time_ Message time stamp. + * @param clock_ Message writer clock. + * @param id_ Message id. + **/ + typedef std::function MsgReceiveCallbackT; + + /** + * @brief Add receive callback for incoming messages. + * + * @param callback_ The callback function. + * + * @return True if it succeeds, false if it fails. + **/ + bool AddReceiveCallback(MsgReceiveCallbackT callback_) + { + RemReceiveCallback(); + + { + std::lock_guard callback_lock(m_cb_callback_mutex); + m_cb_callback = callback_; + } + auto callback = std::bind(&CMessageSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2); + return(CSubscriber::AddReceiveCallback(callback)); + } + + /** + * @brief Remove receive callback for incoming messages. + * + * @return True if it succeeds, false if it fails. + **/ + bool RemReceiveCallback() + { + bool ret = CSubscriber::RemReceiveCallback(); + + std::lock_guard callback_lock(m_cb_callback_mutex); + if (m_cb_callback == nullptr) return(false); + m_cb_callback = nullptr; + return(ret); + } + + private: + void ReceiveCallback(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_) + { + MsgReceiveCallbackT fn_callback = nullptr; + { + std::lock_guard callback_lock(m_cb_callback_mutex); + fn_callback = m_cb_callback; + } + + if (fn_callback == nullptr) return; + + T msg; + // In the future, I would like to get m_datatype_info from the ReceiveBuffer function! + if (m_deserializer.Deserialize(msg, data_->buf, data_->size)) + { + (fn_callback)(topic_name_, msg, data_->time, data_->clock, data_->id); + } + } + + std::mutex m_cb_callback_mutex; + MsgReceiveCallbackT m_cb_callback; + Deserializer m_deserializer; + }; + + + } diff --git a/ecal/samples/cpp/pubsub/protobuf/person_rec/CMakeLists.txt b/ecal/samples/cpp/pubsub/protobuf/person_rec/CMakeLists.txt index 8925dd042b..303df38810 100644 --- a/ecal/samples/cpp/pubsub/protobuf/person_rec/CMakeLists.txt +++ b/ecal/samples/cpp/pubsub/protobuf/person_rec/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ target_link_libraries(${PROJECT_NAME} eCAL::core_protobuf ) -target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) ecal_install_sample(${PROJECT_NAME}) diff --git a/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/CMakeLists.txt b/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/CMakeLists.txt index 0bff96455d..2e62b42879 100644 --- a/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/CMakeLists.txt +++ b/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -38,7 +38,7 @@ target_link_libraries(${PROJECT_NAME} eCAL::core_protobuf ) -target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) ecal_install_sample(${PROJECT_NAME}) diff --git a/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/src/proto_dyn_rec.cpp b/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/src/proto_dyn_rec.cpp index f597681421..8863f64922 100644 --- a/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/src/proto_dyn_rec.cpp +++ b/ecal/samples/cpp/pubsub/protobuf/proto_dyn_rec/src/proto_dyn_rec.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -272,9 +272,9 @@ void ProcProtoMsg(const google::protobuf::Message& msg_, const std::string& pref } } -void ProtoMsgCallback(const char* topic_name_, const google::protobuf::Message& msg_) +void ProtoMsgCallback(const char* topic_name_, const std::shared_ptr& msg_) { - ProcProtoMsg(msg_, topic_name_); + ProcProtoMsg(*msg_, topic_name_); std::cout << std::endl; } diff --git a/ecal/tests/cpp/pubsub_proto_test/CMakeLists.txt b/ecal/tests/cpp/pubsub_proto_test/CMakeLists.txt index cc0c9af257..d012019194 100644 --- a/ecal/tests/cpp/pubsub_proto_test/CMakeLists.txt +++ b/ecal/tests/cpp/pubsub_proto_test/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ target_link_libraries(${PROJECT_NAME} ) -target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) ecal_install_gtest(${PROJECT_NAME}) diff --git a/ecal/tests/cpp/pubsub_proto_test/src/proto_dyn_subscriber_test.cpp b/ecal/tests/cpp/pubsub_proto_test/src/proto_dyn_subscriber_test.cpp index 3f17615b36..e44071b29f 100644 --- a/ecal/tests/cpp/pubsub_proto_test/src/proto_dyn_subscriber_test.cpp +++ b/ecal/tests/cpp/pubsub_proto_test/src/proto_dyn_subscriber_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,7 +58,7 @@ class ProtoDynSubscriberTest : public ::testing::Test { pub.Send(p); } - void OnPerson(const char*, const google::protobuf::Message&, long long) + void OnPerson(const char* /*topic_name*/, const std::shared_ptr& /*message*/, long long /*time*/) { received_callbacks++; } @@ -122,12 +122,9 @@ TEST_F(core_cpp_pubsub_proto_dyn, ProtoDynSubscriberTest_SendReceive) SendPerson(person_pub); std::this_thread::sleep_for(std::chrono::milliseconds(100)); - google::protobuf::Message* message = person_dyn_rec.getMessagePointer(); - ASSERT_NE(message, nullptr) << "pointer returned by dynamic subscriber may not be null"; - - bool received = person_dyn_rec.Receive(*message, nullptr, 500); + auto received_message = person_dyn_rec.Receive(nullptr, 500); // assert that the OnPerson callback has been called once. - ASSERT_TRUE(received) << "we should have received data that was sent"; - auto id = extract_id(*message); - ASSERT_EQ(id, 1); + ASSERT_TRUE(received_message.has_value()) << "we should have received data that was sent"; + auto id = extract_id(*received_message.value()); + ASSERT_EQ(id, 1) << "Extracted ID needs to be 1"; } diff --git a/samples/cpp/pubsub/capnp/addressbook_rec/CMakeLists.txt b/samples/cpp/pubsub/capnp/addressbook_rec/CMakeLists.txt index 28cf9e8dfd..a40923fdee 100644 --- a/samples/cpp/pubsub/capnp/addressbook_rec/CMakeLists.txt +++ b/samples/cpp/pubsub/capnp/addressbook_rec/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/s target_link_libraries(${PROJECT_NAME} PRIVATE CapnProto::capnp eCAL::core) target_link_options(${PROJECT_NAME} PRIVATE $<$:/ignore:4099>) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) + ecal_install_sample(${PROJECT_NAME}) set_property(TARGET ${PROJECT_NAME} PROPERTY FOLDER samples/cpp/pubsub/capnproto) diff --git a/samples/cpp/pubsub/capnp/addressbook_rec/src/addressbook_rec.cpp b/samples/cpp/pubsub/capnp/addressbook_rec/src/addressbook_rec.cpp index 9d7f43fe91..d35e448099 100644 --- a/samples/cpp/pubsub/capnp/addressbook_rec/src/addressbook_rec.cpp +++ b/samples/cpp/pubsub/capnp/addressbook_rec/src/addressbook_rec.cpp @@ -88,9 +88,9 @@ int main(int argc, char **argv) while (eCAL::Ok()) { // receive content - if (sub.Receive(nullptr, 0)) + AddressBook::Reader reader; + if (sub.Receive(reader)) { - AddressBook::Reader reader{ sub.getReader() }; printAddressBook(reader); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/samples/cpp/pubsub/capnp/addressbook_rec_cb/CMakeLists.txt b/samples/cpp/pubsub/capnp/addressbook_rec_cb/CMakeLists.txt index 7719484ef9..002123ae36 100644 --- a/samples/cpp/pubsub/capnp/addressbook_rec_cb/CMakeLists.txt +++ b/samples/cpp/pubsub/capnp/addressbook_rec_cb/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/s target_link_libraries(${PROJECT_NAME} PRIVATE CapnProto::capnp eCAL::core) target_link_options(${PROJECT_NAME} PRIVATE $<$:/ignore:4099>) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) + ecal_install_sample(${PROJECT_NAME}) set_property(TARGET ${PROJECT_NAME} PROPERTY FOLDER samples/cpp/pubsub/capnproto) \ No newline at end of file diff --git a/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/CMakeLists.txt b/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/CMakeLists.txt index cdb1817eeb..1992303cc8 100644 --- a/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/CMakeLists.txt +++ b/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,6 +32,8 @@ target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/c target_link_libraries(${PROJECT_NAME} PRIVATE CapnProto::capnp eCAL::core) target_link_options(${PROJECT_NAME} PRIVATE $<$:/ignore:4099>) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) + ecal_install_sample(${PROJECT_NAME}) set_property(TARGET ${PROJECT_NAME} PROPERTY FOLDER samples/cpp/pubsub/capnproto) diff --git a/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp b/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp index c25c8f3d38..d5562fd031 100644 --- a/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp +++ b/samples/cpp/pubsub/capnp/addressbook_rec_dynamic/src/addressbook_rec_dynamic.cpp @@ -105,7 +105,7 @@ void dynamicPrintValue(const capnp::DynamicValue::Reader& value) } } -int main(int argc, char **argv) +int main(int argc, char** argv) { // initialize eCAL API eCAL::Initialize(argc, argv, "addressbook subscriber"); @@ -115,8 +115,10 @@ int main(int argc, char **argv) // create a subscriber (topic name "addressbook") eCAL::capnproto::CDynamicSubscriber sub("addressbook"); - auto callback = std::bind(dynamicPrintValue, std::placeholders::_2); - sub.AddReceiveCallback(callback); + auto lambda = [](const char* /*topic_name_*/, const capnp::DynamicValue::Reader& msg_, long long /*time_*/, long long /*clock_*/, long long /*id_*/) -> void { + dynamicPrintValue(msg_); + }; + sub.AddReceiveCallback(lambda); // enter main loop while (eCAL::Ok()) diff --git a/samples/cpp/pubsub/capnp/addressbook_snd/CMakeLists.txt b/samples/cpp/pubsub/capnp/addressbook_snd/CMakeLists.txt index 89a7571036..f49bbdc611 100644 --- a/samples/cpp/pubsub/capnp/addressbook_snd/CMakeLists.txt +++ b/samples/cpp/pubsub/capnp/addressbook_snd/CMakeLists.txt @@ -1,6 +1,6 @@ # ========================= eCAL LICENSE ================================= # -# Copyright (C) 2016 - 2019 Continental Corporation +# Copyright (C) 2016 - 2024 Continental Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ target_include_directories(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/s target_link_libraries(${PROJECT_NAME} PRIVATE CapnProto::capnp eCAL::core) target_link_options(${PROJECT_NAME} PRIVATE $<$:/ignore:4099>) +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17) + ecal_install_sample(${PROJECT_NAME}) set_property(TARGET ${PROJECT_NAME} PROPERTY FOLDER samples/cpp/pubsub/capnproto)