From a6d6f1fbae2c910e50372c3b58b6d63e6f8d7ce7 Mon Sep 17 00:00:00 2001 From: Rex Schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Mon, 16 Dec 2024 16:41:16 +0100 Subject: [PATCH] [core] redesign publisher api (#1858) * legacy CPublisher moved to eCAL::v5::CPublisher * new eCAL::CPublisher with reduced API --- .../play_core/src/measurement_container.h | 3 +- ecal/core/CMakeLists.txt | 6 +- ecal/core/include/ecal/cimpl/ecal_log_cimpl.h | 2 +- ecal/core/include/ecal/ecal_log.h | 2 +- ecal/core/include/ecal/ecal_publisher.h | 409 ++++++------------ ecal/core/include/ecal/ecal_publisher_v5.h | 323 ++++++++++++++ .../include/ecal/msg/capnproto/publisher.h | 14 +- .../ecal/msg/protobuf/dynamic_publisher.h | 4 +- .../include/ecal/msg/protobuf/publisher.h | 16 +- ecal/core/include/ecal/msg/publisher.h | 14 +- ecal/core/src/cimpl/ecal_log_cimpl.cpp | 2 +- ecal/core/src/cimpl/ecal_publisher_cimpl.cpp | 23 +- ecal/core/src/logging/ecal_log.cpp | 2 +- ecal/core/src/pubsub/ecal_pubgate.cpp | 51 +-- ecal/core/src/pubsub/ecal_pubgate.h | 16 +- ecal/core/src/pubsub/ecal_publisher.cpp | 197 ++------- .../ecal_publisher_impl.cpp} | 240 +++++----- .../ecal_publisher_impl.h} | 54 ++- ecal/core/src/pubsub/ecal_publisher_v5.cpp | 248 +++++++++++ ...ecal_registration_sample_applier_gates.cpp | 4 +- .../counter_snd/src/counter_snd.cpp | 5 +- .../datarate_snd/src/datarate_snd.cpp | 7 +- .../latency_snd/src/latency_snd.cpp | 4 +- .../multiple_snd/src/multiple_snd.cpp | 6 +- .../performance_snd/src/performance_snd.cpp | 7 +- .../src/pubsub_throughput.cpp | 2 +- .../binary/binary_snd/src/binary_snd.cpp | 2 +- .../src/proto_publisher_test.cpp | 2 +- .../src/proto_subscriber_test.cpp | 9 +- .../src/pubsub_callback_topicid.cpp | 4 +- .../src/pubsub_connection_test.cpp | 10 +- .../pubsub_test/src/pubsub_multibuffer.cpp | 4 +- .../tests/cpp/pubsub_test/src/pubsub_test.cpp | 73 +--- .../cpp/pubsub_test/src/pubsub_test_shm.cpp | 16 +- .../cpp/pubsub_test/src/pubsub_test_udp.cpp | 18 +- .../src/registration_gettopics.cpp | 16 +- .../csharp/Continental.eCAL.Core/ecal_clr.cpp | 6 +- lang/csharp/Continental.eCAL.Core/ecal_clr.h | 4 +- lang/python/core/src/ecal_clang.cpp | 11 +- 39 files changed, 1031 insertions(+), 805 deletions(-) create mode 100644 ecal/core/include/ecal/ecal_publisher_v5.h rename ecal/core/src/{readwrite/ecal_writer.cpp => pubsub/ecal_publisher_impl.cpp} (78%) rename ecal/core/src/{readwrite/ecal_writer.h => pubsub/ecal_publisher_impl.h} (72%) create mode 100644 ecal/core/src/pubsub/ecal_publisher_v5.cpp diff --git a/app/play/play_core/src/measurement_container.h b/app/play/play_core/src/measurement_container.h index 1925da8ce8..dad5036590 100644 --- a/app/play/play_core/src/measurement_container.h +++ b/app/play/play_core/src/measurement_container.h @@ -24,6 +24,7 @@ #include #include +#include #include #include "continuity_report.h" @@ -90,7 +91,7 @@ class MeasurementContainer private: struct PublisherInfo { - eCAL::CPublisher publisher_; + eCAL::v5::CPublisher publisher_; long long message_counter_; PublisherInfo(const std::string& topic_name, const eCAL::SDataTypeInformation& info_) diff --git a/ecal/core/CMakeLists.txt b/ecal/core/CMakeLists.txt index fa43f791df..2b8412762b 100644 --- a/ecal/core/CMakeLists.txt +++ b/ecal/core/CMakeLists.txt @@ -204,6 +204,9 @@ endif() if(ECAL_CORE_PUBLISHER) set(ecal_pub_src src/pubsub/ecal_publisher.cpp + src/pubsub/ecal_publisher_impl.cpp + src/pubsub/ecal_publisher_impl.h + src/pubsub/ecal_publisher_v5.cpp src/pubsub/ecal_pubgate.cpp src/pubsub/ecal_pubgate.h ) @@ -226,8 +229,6 @@ set(ecal_readwrite_src if(ECAL_CORE_PUBLISHER) set(ecal_writer_src - src/readwrite/ecal_writer.cpp - src/readwrite/ecal_writer.h src/readwrite/ecal_writer_base.h src/readwrite/ecal_writer_buffer_payload.h src/readwrite/ecal_writer_data.h @@ -538,6 +539,7 @@ set(ecal_header_cmn include/ecal/ecal_process_severity.h include/ecal/ecal_registration.h include/ecal/ecal_publisher.h + include/ecal/ecal_publisher_v5.h include/ecal/ecal_server.h include/ecal/ecal_server_v5.h include/ecal/ecal_service_info.h diff --git a/ecal/core/include/ecal/cimpl/ecal_log_cimpl.h b/ecal/core/include/ecal/cimpl/ecal_log_cimpl.h index d7522e3536..48eaa8048d 100644 --- a/ecal/core/include/ecal/cimpl/ecal_log_cimpl.h +++ b/ecal/core/include/ecal/cimpl/ecal_log_cimpl.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/core/include/ecal/ecal_log.h b/ecal/core/include/ecal/ecal_log.h index d103021c56..8e8854f8c1 100644 --- a/ecal/core/include/ecal/ecal_log.h +++ b/ecal/core/include/ecal/ecal_log.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/core/include/ecal/ecal_publisher.h b/ecal/core/include/ecal/ecal_publisher.h index f48e94a84d..357459be8b 100644 --- a/ecal/core/include/ecal/ecal_publisher.h +++ b/ecal/core/include/ecal/ecal_publisher.h @@ -24,11 +24,12 @@ #pragma once -#include #include #include -#include + +#include #include +#include #include #include @@ -39,282 +40,136 @@ namespace eCAL { - class CDataWriter; + class CPublisherImpl; - /** - * @brief eCAL publisher class. - * - * The CPublisher class is used to send topics to matching eCAL subscribers. The topic is created automatically by the constructor - * or by the Create member function. - *
- *
- * For sending the topic payload the publisher class provides an overloaded Send method. The first one is sending the payload as - * a std::string. The second needs a preallocated buffer described by a buffer address and a buffer length. The publisher is not - * taking the ownership for the allocated memory buffer. - *
- *
- * An optional time stamp can be attached to the topic payload. - * - **/ - /** - * @code - * // create publisher, topic name "A" - * eCAL::CPublisher pub("A"); - * - * // send string - * std::string send_s = "Hello World "; - * - * // send content - * size_t snd_len = pub.Send(send_s); - * @endcode - **/ - class ECAL_API_CLASS CPublisher + inline namespace v6 { - public: - ECAL_API_EXPORTED_MEMBER - static constexpr long long DEFAULT_TIME_ARGUMENT = -1; /*!< Use DEFAULT_TIME_ARGUMENT in the `Send()` function to let eCAL determine the send timestamp */ - - /** - * @brief Constructor. - **/ - ECAL_API_EXPORTED_MEMBER - CPublisher(); - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - * @param data_type_info_ Topic data type information (encoding, type, descriptor). - * @param config_ Optional configuration parameters. - **/ - ECAL_API_EXPORTED_MEMBER - CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = GetPublisherConfiguration()); - - /** - * @brief Constructor. - * - * @param topic_name_ Unique topic name. - * @param config_ Optional configuration parameters. - **/ - ECAL_API_EXPORTED_MEMBER - explicit CPublisher(const std::string& topic_name_, const Publisher::Configuration& config_ = GetPublisherConfiguration()); - - /** - * @brief Destructor. - **/ - ECAL_API_EXPORTED_MEMBER - virtual ~CPublisher(); - - /** - * @brief CPublishers are non-copyable - **/ - CPublisher(const CPublisher&) = delete; - - /** - * @brief CPublishers are non-copyable - **/ - CPublisher& operator=(const CPublisher&) = delete; - - /** - * @brief CPublishers are move-enabled - **/ - ECAL_API_EXPORTED_MEMBER - CPublisher(CPublisher&& rhs) noexcept; - - /** - * @brief CPublishers are move-enabled - **/ - ECAL_API_EXPORTED_MEMBER - CPublisher& operator=(CPublisher&& rhs) noexcept; - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * @param data_type_info_ Topic data type information (encoding, type, descriptor). - * @param config_ Optional configuration parameters. - * - * @return True if it succeeds, false if it fails. - **/ - ECAL_API_EXPORTED_MEMBER - bool Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = GetPublisherConfiguration()); - - /** - * @brief Creates this object. - * - * @param topic_name_ Unique topic name. - * - * @return True if it succeeds, false if it fails. - **/ - ECAL_API_EXPORTED_MEMBER - bool Create(const std::string& topic_name_); - - /** - * @brief Destroys this object. - * - * @return True if it succeeds, false if it fails. - **/ - ECAL_API_EXPORTED_MEMBER - bool Destroy(); - - /** - * @brief Setup topic information. - * - * @param data_type_info_ Topic data type information attributes. - * - * @return True if it succeeds, false if it fails. - **/ - ECAL_API_EXPORTED_MEMBER - bool SetDataTypeInformation(const SDataTypeInformation& data_type_info_); - - /** - * @brief Sets publisher attribute. - * - * @param attr_name_ Attribute name. - * @param attr_value_ Attribute value. - * - * @return True if it succeeds, false if it fails. - **/ - ECAL_API_EXPORTED_MEMBER - bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_); - - /** - * @brief Removes publisher attribute. - * - * @param attr_name_ Attribute name. - * - * @return True if it succeeds, false if it fails. - * @experimental - **/ - ECAL_API_EXPORTED_MEMBER - bool ClearAttribute(const std::string& attr_name_); - - /** - * @brief Set the specific topic filter id. - * - * @param filter_id_ The topic id for subscriber side filtering (0 == no id). - * - * @return True if it succeeds, false if it fails. - **/ - ECAL_API_EXPORTED_MEMBER - bool SetID(long long filter_id_); - - /** - * @brief Send a message to all subscribers. - * - * @param buf_ Pointer to content buffer. - * @param len_ Length of buffer. - * @param time_ Send time (-1 = use eCAL system time in us, default = -1). - * - * @return Number of bytes sent. - **/ - ECAL_API_EXPORTED_MEMBER - size_t Send(const void* buf_, size_t len_, long long time_ = DEFAULT_TIME_ARGUMENT); - /** - * @brief Send a message to all subscribers. - * - * @param payload_ Payload. - * @param time_ Send time (-1 = use eCAL system time in us, default = -1). - * - * @return Number of bytes sent. - **/ - ECAL_API_EXPORTED_MEMBER - size_t Send(CPayloadWriter& payload_, long long time_ = DEFAULT_TIME_ARGUMENT); - - /** - * @brief Send a message to all subscribers. - * - * @param s_ String that contains content to send. - * @param time_ Send time (-1 = use eCAL system time in us, default = -1). - * - * @return Number of bytes sent. - **/ - ECAL_API_EXPORTED_MEMBER - size_t Send(const std::string& s_, long long time_ = DEFAULT_TIME_ARGUMENT); - - /** - * @brief Add callback function for publisher events. - * - * @param type_ The event type to react on. - * @param callback_ The callback function to add. - * - * @return True if succeeded, false if not. - **/ - ECAL_API_EXPORTED_MEMBER - bool AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_); - - /** - * @brief Remove callback function for publisher events. - * - * @param type_ The event type to remove. - * - * @return True if succeeded, false if not. - **/ - ECAL_API_EXPORTED_MEMBER - bool RemEventCallback(eCAL_Publisher_Event type_); - - /** - * @brief Query if the publisher is created. - * - * @return True if created, false if not. - **/ - ECAL_API_EXPORTED_MEMBER - bool IsCreated() const { return(m_datawriter != nullptr); } - - /** - * @brief Query if the publisher is subscribed. - * - * @return true if subscribed, false if not. - **/ - ECAL_API_EXPORTED_MEMBER - bool IsSubscribed() const; - - /** - * @brief Query the number of subscribers. - * - * @return Number of subscribers. - **/ - ECAL_API_EXPORTED_MEMBER - size_t GetSubscriberCount() const; - - /** - * @brief Gets name of the connected topic. - * - * @return The topic name. - **/ - ECAL_API_EXPORTED_MEMBER - std::string GetTopicName() const; - - /** - * @brief Gets a unique ID of this Publisher - * - * @return The topic id. - **/ - ECAL_API_EXPORTED_MEMBER - Registration::STopicId GetId() const; - - /** - * @brief Gets description of the connected topic. - * - * @return The topic information. - **/ - ECAL_API_EXPORTED_MEMBER - SDataTypeInformation GetDataTypeInformation() const; - - /** - * @brief Dump the whole class state into a string. - * - * @param indent_ Indentation used for dump. - * - * @return The dump string. - **/ - ECAL_API_EXPORTED_MEMBER - std::string Dump(const std::string& indent_ = "") const; - - private: - // class members - std::shared_ptr m_datawriter; - long long m_filter_id; - }; + * @brief eCAL publisher class. + **/ + class ECAL_API_CLASS CPublisher + { + public: + ECAL_API_EXPORTED_MEMBER + static constexpr long long DEFAULT_TIME_ARGUMENT = -1; /*!< Use DEFAULT_TIME_ARGUMENT in the `Send()` function to let eCAL determine the send timestamp */ + + /** + * @brief Constructor. + * + * @param topic_name_ Unique topic name. + * @param data_type_info_ Topic data type information (encoding, type, descriptor). + * @param config_ Optional configuration parameters. + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_ = SDataTypeInformation(), const Publisher::Configuration& config_ = GetPublisherConfiguration()); + + /** + * @brief Constructor. + * + * @param topic_name_ Unique topic name. + * @param data_type_info_ Topic data type information (encoding, type, descriptor). + * @param event_callback_ The publisher event callback funtion. + * @param config_ Optional configuration parameters. + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const PubEventIDCallbackT event_callback_, const Publisher::Configuration& config_ = GetPublisherConfiguration()); + + /** + * @brief Destructor. + **/ + ECAL_API_EXPORTED_MEMBER + virtual ~CPublisher(); + + /** + * @brief CPublishers are non-copyable + **/ + CPublisher(const CPublisher&) = delete; + + /** + * @brief CPublishers are non-copyable + **/ + CPublisher& operator=(const CPublisher&) = delete; + + /** + * @brief CPublishers are move-enabled + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher(CPublisher&& rhs) noexcept; + + /** + * @brief CPublishers are move-enabled + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher& operator=(CPublisher&& rhs) noexcept; + + /** + * @brief Send a message to all subscribers. + * + * @param buf_ Pointer to content buffer. + * @param len_ Length of buffer. + * @param time_ Send time (-1 = use eCAL system time in us, default = -1). + * + * @return True if succeeded, false if not. + **/ + ECAL_API_EXPORTED_MEMBER + bool Send(const void* buf_, size_t len_, long long time_ = DEFAULT_TIME_ARGUMENT); + + /** + * @brief Send a message to all subscribers. + * + * @param payload_ Payload writer. + * @param time_ Send time (-1 = use eCAL system time in us, default = -1). + * + * @return True if succeeded, false if not. + **/ + ECAL_API_EXPORTED_MEMBER + bool Send(CPayloadWriter& payload_, long long time_ = DEFAULT_TIME_ARGUMENT); + + /** + * @brief Send a message to all subscribers. + * + * @param payload_ Payload string. + * @param time_ Send time (-1 = use eCAL system time in us, default = -1). + * + * @return True if succeeded, false if not. + **/ + ECAL_API_EXPORTED_MEMBER + bool Send(const std::string& payload_, long long time_ = DEFAULT_TIME_ARGUMENT); + + /** + * @brief Query the number of subscribers. + * + * @return Number of subscribers. + **/ + ECAL_API_EXPORTED_MEMBER + size_t GetSubscriberCount() const; + + /** + * @brief Retrieve the topic name. + * + * @return The topic name. + **/ + ECAL_API_EXPORTED_MEMBER + std::string GetTopicName() const; + + /** + * @brief Retrieve the topic id. + * + * @return The topic id. + **/ + ECAL_API_EXPORTED_MEMBER + Registration::STopicId GetTopicId() const; + + /** + * @brief Gets description of the connected topic. + * + * @return The topic information. + **/ + ECAL_API_EXPORTED_MEMBER + SDataTypeInformation GetDataTypeInformation() const; + + private: + std::shared_ptr m_publisher_impl; + }; + } } diff --git a/ecal/core/include/ecal/ecal_publisher_v5.h b/ecal/core/include/ecal/ecal_publisher_v5.h new file mode 100644 index 0000000000..2c2fbef67c --- /dev/null +++ b/ecal/core/include/ecal/ecal_publisher_v5.h @@ -0,0 +1,323 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +/** + * @file ecal_publisher_v5.h + * @brief eCAL publisher interface (deprecated eCAL5 version) +**/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace eCAL +{ + class CPublisherImpl; + + namespace v5 + { + /** + * @brief eCAL publisher class. + * + * The CPublisher class is used to send topics to matching eCAL subscribers. The topic is created automatically by the constructor + * or by the Create member function. + *
+ *
+ * For sending the topic payload the publisher class provides an overloaded Send method. The first one is sending the payload as + * a std::string. The second needs a preallocated buffer described by a buffer address and a buffer length. The publisher is not + * taking the ownership for the allocated memory buffer. + *
+ *
+ * An optional time stamp can be attached to the topic payload. + * + **/ + /** + * @code + * // create publisher, topic name "A" + * eCAL::CPublisher pub("A"); + * + * // send string + * std::string send_s = "Hello World "; + * + * // send content + * size_t snd_len = pub.Send(send_s); + * @endcode + **/ + class ECAL_API_CLASS CPublisher + { + public: + ECAL_API_EXPORTED_MEMBER + static constexpr long long DEFAULT_TIME_ARGUMENT = -1; /*!< Use DEFAULT_TIME_ARGUMENT in the `Send()` function to let eCAL determine the send timestamp */ + + /** + * @brief Constructor. + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher(); + + /** + * @brief Constructor. + * + * @param topic_name_ Unique topic name. + * @param data_type_info_ Topic data type information (encoding, type, descriptor). + * @param config_ Optional configuration parameters. + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = GetPublisherConfiguration()); + + /** + * @brief Constructor. + * + * @param topic_name_ Unique topic name. + * @param config_ Optional configuration parameters. + **/ + ECAL_API_EXPORTED_MEMBER + explicit CPublisher(const std::string& topic_name_, const Publisher::Configuration& config_ = GetPublisherConfiguration()); + + /** + * @brief Destructor. + **/ + ECAL_API_EXPORTED_MEMBER + virtual ~CPublisher(); + + /** + * @brief CPublishers are non-copyable + **/ + CPublisher(const CPublisher&) = delete; + + /** + * @brief CPublishers are non-copyable + **/ + CPublisher& operator=(const CPublisher&) = delete; + + /** + * @brief CPublishers are move-enabled + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher(CPublisher&& rhs) noexcept; + + /** + * @brief CPublishers are move-enabled + **/ + ECAL_API_EXPORTED_MEMBER + CPublisher& operator=(CPublisher&& rhs) noexcept; + + /** + * @brief Creates this object. + * + * @param topic_name_ Unique topic name. + * @param data_type_info_ Topic data type information (encoding, type, descriptor). + * @param config_ Optional configuration parameters. + * + * @return True if it succeeds, false if it fails. + **/ + ECAL_API_EXPORTED_MEMBER + bool Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = GetPublisherConfiguration()); + + /** + * @brief Creates this object. + * + * @param topic_name_ Unique topic name. + * + * @return True if it succeeds, false if it fails. + **/ + ECAL_API_EXPORTED_MEMBER + bool Create(const std::string& topic_name_); + + /** + * @brief Destroys this object. + * + * @return True if it succeeds, false if it fails. + **/ + ECAL_API_EXPORTED_MEMBER + bool Destroy(); + + /** + * @brief Setup topic information. + * + * @param data_type_info_ Topic data type information attributes. + * + * @return True if it succeeds, false if it fails. + **/ + ECAL_API_EXPORTED_MEMBER + bool SetDataTypeInformation(const SDataTypeInformation& data_type_info_); + + /** + * @brief Sets publisher attribute. + * + * @param attr_name_ Attribute name. + * @param attr_value_ Attribute value. + * + * @return True if it succeeds, false if it fails. + **/ + ECAL_API_EXPORTED_MEMBER + bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_); + + /** + * @brief Removes publisher attribute. + * + * @param attr_name_ Attribute name. + * + * @return True if it succeeds, false if it fails. + * @experimental + **/ + ECAL_API_EXPORTED_MEMBER + bool ClearAttribute(const std::string& attr_name_); + + /** + * @brief Set the specific topic filter id. + * + * @param filter_id_ The topic id for subscriber side filtering (0 == no id). + * + * @return True if it succeeds, false if it fails. + **/ + ECAL_API_EXPORTED_MEMBER + bool SetID(long long filter_id_); + + /** + * @brief Send a message to all subscribers. + * + * @param buf_ Pointer to content buffer. + * @param len_ Length of buffer. + * @param time_ Send time (-1 = use eCAL system time in us, default = -1). + * + * @return Number of bytes sent. + **/ + ECAL_API_EXPORTED_MEMBER + size_t Send(const void* buf_, size_t len_, long long time_ = DEFAULT_TIME_ARGUMENT); + + /** + * @brief Send a message to all subscribers. + * + * @param payload_ Payload. + * @param time_ Send time (-1 = use eCAL system time in us, default = -1). + * + * @return Number of bytes sent. + **/ + ECAL_API_EXPORTED_MEMBER + size_t Send(CPayloadWriter& payload_, long long time_ = DEFAULT_TIME_ARGUMENT); + + /** + * @brief Send a message to all subscribers. + * + * @param s_ String that contains content to send. + * @param time_ Send time (-1 = use eCAL system time in us, default = -1). + * + * @return Number of bytes sent. + **/ + ECAL_API_EXPORTED_MEMBER + size_t Send(const std::string& s_, long long time_ = DEFAULT_TIME_ARGUMENT); + + /** + * @brief Add callback function for publisher events. + * + * @param type_ The event type to react on. + * @param callback_ The callback function to add. + * + * @return True if succeeded, false if not. + **/ + ECAL_API_EXPORTED_MEMBER + bool AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_); + + /** + * @brief Remove callback function for publisher events. + * + * @param type_ The event type to remove. + * + * @return True if succeeded, false if not. + **/ + ECAL_API_EXPORTED_MEMBER + bool RemEventCallback(eCAL_Publisher_Event type_); + + /** + * @brief Query if the publisher is created. + * + * @return True if created, false if not. + **/ + ECAL_API_EXPORTED_MEMBER + bool IsCreated() const { return(m_publisher_impl != nullptr); } + + /** + * @brief Query if the publisher is subscribed. + * + * @return true if subscribed, false if not. + **/ + ECAL_API_EXPORTED_MEMBER + bool IsSubscribed() const; + + /** + * @brief Query the number of subscribers. + * + * @return Number of subscribers. + **/ + ECAL_API_EXPORTED_MEMBER + size_t GetSubscriberCount() const; + + /** + * @brief Gets name of the connected topic. + * + * @return The topic name. + **/ + ECAL_API_EXPORTED_MEMBER + std::string GetTopicName() const; + + /** + * @brief Gets a unique ID of this Publisher + * + * @return The topic id. + **/ + ECAL_API_EXPORTED_MEMBER + Registration::STopicId GetId() const; + + /** + * @brief Gets description of the connected topic. + * + * @return The topic information. + **/ + ECAL_API_EXPORTED_MEMBER + SDataTypeInformation GetDataTypeInformation() const; + + /** + * @brief Dump the whole class state into a string. + * + * @param indent_ Indentation used for dump. + * + * @return The dump string. + **/ + ECAL_API_EXPORTED_MEMBER + std::string Dump(const std::string& indent_ = "") const; + + private: + // class members + std::shared_ptr m_publisher_impl; + long long m_filter_id; + }; + } +} diff --git a/ecal/core/include/ecal/msg/capnproto/publisher.h b/ecal/core/include/ecal/msg/capnproto/publisher.h index 60192fd4ae..b49c2f2b7c 100644 --- a/ecal/core/include/ecal/msg/capnproto/publisher.h +++ b/ecal/core/include/ecal/msg/capnproto/publisher.h @@ -24,7 +24,7 @@ #pragma once -#include +#include #include // capnp includes @@ -50,7 +50,7 @@ namespace eCAL * **/ template - class CPublisher : public eCAL::CPublisher + class CPublisher : public eCAL::v5::CPublisher { class CPayload : public eCAL::CPayloadWriter { @@ -89,7 +89,7 @@ namespace eCAL * @brief Constructor. **/ CPublisher() - : eCAL::CPublisher() + : eCAL::v5::CPublisher() , builder(std::make_unique()) , root_builder(builder->initRoot()) { @@ -102,7 +102,7 @@ namespace eCAL * @param config_ Optional configuration parameters. **/ CPublisher(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = {}) - : eCAL::CPublisher(topic_name_, GetDataTypeInformation(), config_) + : eCAL::v5::CPublisher(topic_name_, GetDataTypeInformation(), config_) , builder(std::make_unique()) , root_builder(builder->initRoot()) { @@ -152,7 +152,7 @@ namespace eCAL **/ bool Create(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = {}) { - return(eCAL::CPublisher::Create(topic_name_, GetDataTypeInformation(), config_)); + return(eCAL::v5::CPublisher::Create(topic_name_, GetDataTypeInformation(), config_)); } typename message_type::Builder GetBuilder() @@ -160,10 +160,10 @@ namespace eCAL return root_builder; } - void Send() + bool Send() { CPayload payload{ *builder }; - eCAL::CPublisher::Send(payload); + return eCAL::v5::CPublisher::Send(payload); } private: diff --git a/ecal/core/include/ecal/msg/protobuf/dynamic_publisher.h b/ecal/core/include/ecal/msg/protobuf/dynamic_publisher.h index 2ff702f89a..46d55bd08f 100644 --- a/ecal/core/include/ecal/msg/protobuf/dynamic_publisher.h +++ b/ecal/core/include/ecal/msg/protobuf/dynamic_publisher.h @@ -86,9 +86,9 @@ namespace eCAL * * @param time_ Optional time stamp. * - * @return Number of bytes sent. + * @return True if succeeded, false if not. **/ - size_t Send(long long time_ = -1) + bool Send(long long time_ = -1) { return CMsgPublisher::Send(*m_msg, time_); } diff --git a/ecal/core/include/ecal/msg/protobuf/publisher.h b/ecal/core/include/ecal/msg/protobuf/publisher.h index daab06f0bd..d3acf442c9 100644 --- a/ecal/core/include/ecal/msg/protobuf/publisher.h +++ b/ecal/core/include/ecal/msg/protobuf/publisher.h @@ -27,7 +27,7 @@ #include #include #include -#include +#include // protobuf includes #ifdef _MSC_VER @@ -54,7 +54,7 @@ namespace eCAL * **/ template - class CPublisher : public eCAL::CPublisher + class CPublisher : public eCAL::v5::CPublisher { class CPayload : public eCAL::CPayloadWriter { @@ -92,7 +92,7 @@ namespace eCAL /** * @brief Constructor. **/ - CPublisher() : eCAL::CPublisher() + CPublisher() : eCAL::v5::CPublisher() { } @@ -107,7 +107,7 @@ namespace eCAL // where the vtable is not created yet, or it's destructed. // Probably we can handle the Message publishers differently. One message publisher class and then one class for payloads and getting type // descriptor information. - explicit CPublisher(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = GetPublisherConfiguration()) : eCAL::CPublisher(topic_name_, CPublisher::GetDataTypeInformation(), config_) + explicit CPublisher(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = GetPublisherConfiguration()) : eCAL::v5::CPublisher(topic_name_, CPublisher::GetDataTypeInformation(), config_) { } @@ -146,7 +146,7 @@ namespace eCAL **/ bool Create(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = GetPublisherConfiguration()) { - return(eCAL::CPublisher::Create(topic_name_, GetDataTypeInformation(), config_)); + return(eCAL::v5::CPublisher::Create(topic_name_, GetDataTypeInformation(), config_)); } /** @@ -155,12 +155,12 @@ namespace eCAL * @param msg_ The message object. * @param time_ Time stamp. * - * @return Number of bytes sent. + * @return True if succeeded, false if not. **/ - size_t Send(const T& msg_, long long time_ = DEFAULT_TIME_ARGUMENT) + bool Send(const T& msg_, long long time_ = DEFAULT_TIME_ARGUMENT) { CPayload payload{ msg_ }; - return eCAL::CPublisher::Send(payload, time_); + return (eCAL::v5::CPublisher::Send(payload, time_) > 0); } private: diff --git a/ecal/core/include/ecal/msg/publisher.h b/ecal/core/include/ecal/msg/publisher.h index 5497c9bf95..900a9b57b0 100644 --- a/ecal/core/include/ecal/msg/publisher.h +++ b/ecal/core/include/ecal/msg/publisher.h @@ -25,7 +25,7 @@ #pragma once #include -#include +#include #include #include @@ -43,7 +43,7 @@ namespace eCAL * **/ template - class CMsgPublisher : public CPublisher + class CMsgPublisher : public v5::CPublisher { public: /** @@ -130,9 +130,9 @@ namespace eCAL * @param msg_ The message object. * @param time_ Optional time stamp. * - * @return Number of bytes sent. + * @return True if succeeded, false if not. **/ - size_t Send(const T& msg_, long long time_ = DEFAULT_TIME_ARGUMENT) + bool Send(const T& msg_, long long time_ = DEFAULT_TIME_ARGUMENT) { // this is an optimization ... // if there is no subscription we do not waste time for @@ -141,7 +141,7 @@ namespace eCAL // counting and frequency calculation for the monitoring layer if (!IsSubscribed()) { - return(CPublisher::Send(nullptr, 0, time_)); + return(eCAL::v5::CPublisher::Send(nullptr, 0, time_) > 0); } // if we have a subscription allocate memory for the @@ -153,13 +153,13 @@ namespace eCAL m_buffer.resize(size); if (Serialize(msg_, m_buffer.data(), m_buffer.size())) { - return(CPublisher::Send(m_buffer.data(), size, time_)); + return(eCAL::v5::CPublisher::Send(m_buffer.data(), size, time_) > 0); } } else { // send a zero payload length message to trigger the subscriber side - return(CPublisher::Send(nullptr, 0, time_)); + return(eCAL::v5::CPublisher::Send(nullptr, 0, time_) > 0); } return(0); } diff --git a/ecal/core/src/cimpl/ecal_log_cimpl.cpp b/ecal/core/src/cimpl/ecal_log_cimpl.cpp index 58b564b34b..b6508b3ce7 100644 --- a/ecal/core/src/cimpl/ecal_log_cimpl.cpp +++ b/ecal/core/src/cimpl/ecal_log_cimpl.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/core/src/cimpl/ecal_publisher_cimpl.cpp b/ecal/core/src/cimpl/ecal_publisher_cimpl.cpp index cc32d00e35..b7ff5a97fe 100644 --- a/ecal/core/src/cimpl/ecal_publisher_cimpl.cpp +++ b/ecal/core/src/cimpl/ecal_publisher_cimpl.cpp @@ -23,6 +23,7 @@ **/ #include +#include #include #include "ecal_common_cimpl.h" @@ -52,14 +53,14 @@ extern "C" { ECALC_API ECAL_HANDLE eCAL_Pub_New() { - auto* pub = new eCAL::CPublisher; // NOLINT(*-owning-memory) + auto* pub = new eCAL::v5::CPublisher; // NOLINT(*-owning-memory) return(pub); } ECALC_API int eCAL_Pub_Create(ECAL_HANDLE handle_, const char* topic_name_, const char* topic_type_name_, const char* topic_type_encoding_, const char* topic_desc_, int topic_desc_len_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); const eCAL::SDataTypeInformation topic_info = { topic_type_name_, topic_type_encoding_, std::string(topic_desc_, static_cast(topic_desc_len_)) }; if (!pub->Create(topic_name_, topic_info)) return(0); return(1); @@ -68,7 +69,7 @@ extern "C" ECALC_API int eCAL_Pub_Destroy(ECAL_HANDLE handle_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); delete pub; // NOLINT(*-owning-memory) return(1); } @@ -76,7 +77,7 @@ extern "C" ECALC_API int eCAL_Pub_SetAttribute(ECAL_HANDLE handle_, const char* attr_name_, int attr_name_len_, const char* attr_value_, int attr_value_len_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); if (pub->SetAttribute(std::string(attr_name_, static_cast(attr_name_len_)), std::string(attr_value_, static_cast(attr_value_len_)))) return(1); return(0); } @@ -84,7 +85,7 @@ extern "C" ECALC_API int eCAL_Pub_ClearAttribute(ECAL_HANDLE handle_, const char* attr_name_, int attr_name_len_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); if (pub->ClearAttribute(std::string(attr_name_, static_cast(attr_name_len_)))) return(1); return(0); } @@ -92,7 +93,7 @@ extern "C" ECALC_API int eCAL_Pub_SetID(ECAL_HANDLE handle_, long long id_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); if (pub->SetID(id_)) return(1); return(0); } @@ -100,7 +101,7 @@ extern "C" ECALC_API int eCAL_Pub_IsSubscribed(ECAL_HANDLE handle_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); if (pub->IsSubscribed()) return(1); return(0); } @@ -108,7 +109,7 @@ extern "C" ECALC_API int eCAL_Pub_Send(ECAL_HANDLE handle_, const void* const buf_, int buf_len_, long long time_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); const size_t ret = pub->Send(buf_, static_cast(buf_len_), time_); if (static_cast(ret) == buf_len_) { @@ -120,7 +121,7 @@ extern "C" ECALC_API int eCAL_Pub_AddEventCallback(ECAL_HANDLE handle_, enum eCAL_Publisher_Event type_, PubEventCallbackCT callback_, void* par_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); auto callback = std::bind(g_pub_event_callback, std::placeholders::_1, std::placeholders::_2, callback_, par_); if (pub->AddEventCallback(type_, callback)) return(1); return(0); @@ -129,7 +130,7 @@ extern "C" ECALC_API int eCAL_Pub_RemEventCallback(ECAL_HANDLE handle_, enum eCAL_Publisher_Event type_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); if (pub->RemEventCallback(type_)) return(1); return(0); } @@ -137,7 +138,7 @@ extern "C" ECALC_API int eCAL_Pub_Dump(ECAL_HANDLE handle_, void* buf_, int buf_len_) { if (handle_ == nullptr) return(0); - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); const std::string dump = pub->Dump(); if (!dump.empty()) { diff --git a/ecal/core/src/logging/ecal_log.cpp b/ecal/core/src/logging/ecal_log.cpp index 2e9c33def3..5c5a2ecda6 100644 --- a/ecal/core/src/logging/ecal_log.cpp +++ b/ecal/core/src/logging/ecal_log.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/core/src/pubsub/ecal_pubgate.cpp b/ecal/core/src/pubsub/ecal_pubgate.cpp index 19e52073f7..88906a720c 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.cpp +++ b/ecal/core/src/pubsub/ecal_pubgate.cpp @@ -55,39 +55,35 @@ namespace eCAL if(!m_created) return; // stop & destroy all remaining publisher - const std::unique_lock lock(m_topic_name_datawriter_sync); - for (const auto& datawriter : m_topic_name_datawriter_map) - { - datawriter.second->Stop(); - } - m_topic_name_datawriter_map.clear(); + const std::unique_lock lock(m_topic_name_publisher_mutex); + m_topic_name_publisher_map.clear(); m_created = false; } - bool CPubGate::Register(const std::string& topic_name_, const std::shared_ptr& datawriter_) + bool CPubGate::Register(const std::string& topic_name_, const std::shared_ptr& publisher_) { if(!m_created) return(false); // register writer and multicast group - const std::unique_lock lock(m_topic_name_datawriter_sync); - m_topic_name_datawriter_map.emplace(std::pair>(topic_name_, datawriter_)); + const std::unique_lock lock(m_topic_name_publisher_mutex); + m_topic_name_publisher_map.emplace(std::pair>(topic_name_, publisher_)); return(true); } - bool CPubGate::Unregister(const std::string& topic_name_, const std::shared_ptr& datawriter_) + bool CPubGate::Unregister(const std::string& topic_name_, const std::shared_ptr& publisher_) { if(!m_created) return(false); bool ret_state = false; - const std::unique_lock lock(m_topic_name_datawriter_sync); - auto res = m_topic_name_datawriter_map.equal_range(topic_name_); + const std::unique_lock lock(m_topic_name_publisher_mutex); + auto res = m_topic_name_publisher_map.equal_range(topic_name_); for(auto iter = res.first; iter != res.second; ++iter) { - if(iter->second == datawriter_) + if(iter->second == publisher_) { - m_topic_name_datawriter_map.erase(iter); + m_topic_name_publisher_map.erase(iter); ret_state = true; break; } @@ -96,7 +92,7 @@ namespace eCAL return(ret_state); } - void CPubGate::ApplySubRegistration(const Registration::Sample& ecal_sample_) + void CPubGate::ApplySubscriberRegistration(const Registration::Sample& ecal_sample_) { if(!m_created) return; @@ -110,7 +106,7 @@ namespace eCAL const auto& subscription_info = ecal_sample_.identifier; const SDataTypeInformation& topic_information = ecal_topic.tdatatype; - CDataWriter::SLayerStates layer_states; + CPublisherImpl::SLayerStates layer_states; for (const auto& layer : ecal_topic.tlayer) { // transport layer versions 0 and 1 did not support dynamic layer enable feature @@ -146,15 +142,15 @@ namespace eCAL #endif // register subscriber - const std::shared_lock lock(m_topic_name_datawriter_sync); - auto res = m_topic_name_datawriter_map.equal_range(topic_name); - for(TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter) + const std::shared_lock lock(m_topic_name_publisher_mutex); + auto res = m_topic_name_publisher_map.equal_range(topic_name); + for(TopicNamePublisherMapT::const_iterator iter = res.first; iter != res.second; ++iter) { - iter->second->ApplySubscription(subscription_info, topic_information, layer_states, reader_par); + iter->second->ApplySubscriberRegistration(subscription_info, topic_information, layer_states, reader_par); } } - void CPubGate::ApplySubUnregistration(const Registration::Sample& ecal_sample_) + void CPubGate::ApplySubscriberUnregistration(const Registration::Sample& ecal_sample_) { if (!m_created) return; @@ -165,13 +161,14 @@ namespace eCAL if (topic_name.empty()) return; const auto& subscription_info = ecal_sample_.identifier; + const SDataTypeInformation& topic_information = ecal_topic.tdatatype; // unregister subscriber - const std::shared_lock lock(m_topic_name_datawriter_sync); - auto res = m_topic_name_datawriter_map.equal_range(topic_name); - for (TopicNameDataWriterMapT::const_iterator iter = res.first; iter != res.second; ++iter) + const std::shared_lock lock(m_topic_name_publisher_mutex); + auto res = m_topic_name_publisher_map.equal_range(topic_name); + for (TopicNamePublisherMapT::const_iterator iter = res.first; iter != res.second; ++iter) { - iter->second->RemoveSubscription(subscription_info); + iter->second->ApplySubscriberUnregistration(subscription_info, topic_information); } } @@ -180,8 +177,8 @@ namespace eCAL if (!m_created) return; // read reader registrations - const std::shared_lock lock(m_topic_name_datawriter_sync); - for (const auto& iter : m_topic_name_datawriter_map) + const std::shared_lock lock(m_topic_name_publisher_mutex); + for (const auto& iter : m_topic_name_publisher_map) { iter.second->GetRegistration(reg_sample_list_.push_back()); } diff --git a/ecal/core/src/pubsub/ecal_pubgate.h b/ecal/core/src/pubsub/ecal_pubgate.h index cc73d36bc7..b3d4937508 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.h +++ b/ecal/core/src/pubsub/ecal_pubgate.h @@ -23,7 +23,7 @@ #pragma once -#include "readwrite/ecal_writer.h" +#include "ecal_publisher_impl.h" #include "serialization/ecal_struct_sample_registration.h" #include @@ -44,19 +44,19 @@ namespace eCAL void Start(); void Stop(); - bool Register(const std::string& topic_name_, const std::shared_ptr& datawriter_); - bool Unregister(const std::string& topic_name_, const std::shared_ptr& datawriter_); + bool Register(const std::string& topic_name_, const std::shared_ptr& publisher_); + bool Unregister(const std::string& topic_name_, const std::shared_ptr& publisher_); - void ApplySubRegistration(const Registration::Sample& ecal_sample_); - void ApplySubUnregistration(const Registration::Sample& ecal_sample_); + void ApplySubscriberRegistration(const Registration::Sample& ecal_sample_); + void ApplySubscriberUnregistration(const Registration::Sample& ecal_sample_); void GetRegistrations(Registration::SampleList& reg_sample_list_); protected: static std::atomic m_created; - using TopicNameDataWriterMapT = std::multimap>; - std::shared_timed_mutex m_topic_name_datawriter_sync; - TopicNameDataWriterMapT m_topic_name_datawriter_map; + using TopicNamePublisherMapT = std::multimap>; + std::shared_timed_mutex m_topic_name_publisher_mutex; + TopicNamePublisherMapT m_topic_name_publisher_map; }; } diff --git a/ecal/core/src/pubsub/ecal_publisher.cpp b/ecal/core/src/pubsub/ecal_publisher.cpp index a3d73f66c9..0f767402a3 100644 --- a/ecal/core/src/pubsub/ecal_publisher.cpp +++ b/ecal/core/src/pubsub/ecal_publisher.cpp @@ -18,13 +18,13 @@ */ /** - * @brief common data publisher based on eCAL + * @brief eCAL publisher interface **/ #include #include "ecal_globals.h" -#include "readwrite/ecal_writer.h" +#include "ecal_publisher_impl.h" #include "readwrite/ecal_writer_buffer_payload.h" #include "config/builder/writer_attribute_builder.h" @@ -38,207 +38,102 @@ namespace eCAL { - CPublisher::CPublisher() : - m_datawriter(nullptr), - m_filter_id(0) - { - } - CPublisher::CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_) - : CPublisher() { - CPublisher::Create(topic_name_, data_type_info_, config_); - } + // create publisher implementation + m_publisher_impl = std::make_shared(data_type_info_, BuildWriterAttributes(topic_name_, config_, GetTransportLayerConfiguration(), GetRegistrationConfiguration())); - CPublisher::CPublisher(const std::string& topic_name_, const Publisher::Configuration& config_) - : CPublisher(topic_name_, SDataTypeInformation{}, config_) - {} - - CPublisher::~CPublisher() - { - CPublisher::Destroy(); + // register publisher + if(g_pubgate() != nullptr) g_pubgate()->Register(topic_name_, m_publisher_impl); } - /** - * @brief CPublisher are move-enabled - **/ - CPublisher::CPublisher(CPublisher&& rhs) noexcept : - m_datawriter(std::move(rhs.m_datawriter)), - m_filter_id(rhs.m_filter_id) + CPublisher::CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const PubEventIDCallbackT event_callback_, const Publisher::Configuration& config_) : + CPublisher(topic_name_, data_type_info_, config_) { - rhs.m_datawriter = nullptr; + // add event callback for all current event types + m_publisher_impl->AddEventIDCallback(event_callback_); } - /** - * @brief CPublisher are move-enabled - **/ - CPublisher& CPublisher::operator=(CPublisher&& rhs) noexcept + CPublisher::~CPublisher() { - // Call destroy, to clean up the current state, then afterwards move all elements - Destroy(); - - m_datawriter = std::move(rhs.m_datawriter); - m_filter_id = rhs.m_filter_id; + // could be already destroyed by move + if (m_publisher_impl == nullptr) return; - rhs.m_datawriter = nullptr; - - return *this; - } + // unregister publisher + if (g_pubgate() != nullptr) g_pubgate()->Unregister(m_publisher_impl->GetTopicName(), m_publisher_impl); - bool CPublisher::Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_) - { - if (m_datawriter != nullptr) return(false); - if (topic_name_.empty()) return(false); - - // create datawriter - m_datawriter = std::make_shared(data_type_info_, BuildWriterAttributes(topic_name_, config_, GetTransportLayerConfiguration(), GetRegistrationConfiguration())); - - // register datawriter - g_pubgate()->Register(topic_name_, m_datawriter); - - // we made it :-) - return(true); - } - - bool CPublisher::Create(const std::string& topic_name_) - { - return Create(topic_name_, SDataTypeInformation()); - } - - bool CPublisher::Destroy() - { - if (m_datawriter == nullptr) return(false); - - // unregister datawriter - if(g_pubgate() != nullptr) g_pubgate()->Unregister(m_datawriter->GetTopicName(), m_datawriter); #ifndef NDEBUG - // log it - eCAL::Logging::Log(log_level_debug1, std::string(m_datawriter->GetTopicName() + "::CPublisher::Destroy")); + eCAL::Logging::Log(log_level_debug1, std::string(m_publisher_impl->GetTopicName() + "::CPublisher::Destructor")); #endif - - // stop & destroy datawriter - m_datawriter->Stop(); - m_datawriter.reset(); - - // we made it :-) - return(true); } - bool CPublisher::SetDataTypeInformation(const SDataTypeInformation& data_type_info_) - { - if (m_datawriter == nullptr) return false; - return m_datawriter->SetDataTypeInformation(data_type_info_); - } - - bool CPublisher::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) - { - if(m_datawriter == nullptr) return false; - return m_datawriter->SetAttribute(attr_name_, attr_value_); - } - - bool CPublisher::ClearAttribute(const std::string& attr_name_) + CPublisher::CPublisher(CPublisher&& rhs) noexcept : + m_publisher_impl(std::move(rhs.m_publisher_impl)) { - if(m_datawriter == nullptr) return false; - return m_datawriter->ClearAttribute(attr_name_); } - bool CPublisher::SetID(long long filter_id_) + CPublisher& CPublisher::operator=(CPublisher&& rhs) noexcept { - m_filter_id = filter_id_; - return true; + if (this != &rhs) + { + m_publisher_impl = std::move(rhs.m_publisher_impl); + } + return *this; } - size_t CPublisher::Send(const void* const buf_, const size_t len_, const long long time_ /* = DEFAULT_TIME_ARGUMENT */) + bool CPublisher::Send(const void* const buf_, const size_t len_, const long long time_ /* = DEFAULT_TIME_ARGUMENT */) { CBufferPayloadWriter payload{ buf_, len_ }; return Send(payload, time_); } - size_t CPublisher::Send(CPayloadWriter& payload_, long long time_) + bool CPublisher::Send(CPayloadWriter& payload_, long long time_) { - if (m_datawriter == nullptr) return 0; - - // in an optimization case the + if (m_publisher_impl == nullptr) return false; + + // in an optimization case the // publisher can send an empty package // or we do not have any subscription at all // then the data writer will only do some statistics // for the monitoring layer and return - if (!IsSubscribed()) + if (GetSubscriberCount() == 0) { - m_datawriter->RefreshSendCounter(); - return(payload_.GetSize()); + m_publisher_impl->RefreshSendCounter(); + // we return false here to indicate that we did not really send something + return false; } // send content via data writer layer const long long write_time = (time_ == DEFAULT_TIME_ARGUMENT) ? eCAL::Time::GetMicroSeconds() : time_; - const size_t written_bytes = m_datawriter->Write(payload_, write_time, m_filter_id); - - // return number of bytes written - return written_bytes; - } - - size_t CPublisher::Send(const std::string& s_, long long time_) - { - return(Send(s_.data(), s_.size(), time_)); + return m_publisher_impl->Write(payload_, write_time, 0); } - bool CPublisher::AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_) + bool CPublisher::Send(const std::string& payload_, long long time_) { - if (m_datawriter == nullptr) return(false); - RemEventCallback(type_); - return(m_datawriter->AddEventCallback(type_, std::move(callback_))); - } - - bool CPublisher::RemEventCallback(eCAL_Publisher_Event type_) - { - if (m_datawriter == nullptr) return(false); - return(m_datawriter->RemEventCallback(type_)); - } - - bool CPublisher::IsSubscribed() const - { -#if ECAL_CORE_REGISTRATION - if(m_datawriter == nullptr) return(false); - return(m_datawriter->IsSubscribed()); -#else // ECAL_CORE_REGISTRATION - return(true); -#endif // ECAL_CORE_REGISTRATION + return(Send(payload_.data(), payload_.size(), time_)); } size_t CPublisher::GetSubscriberCount() const { - if (m_datawriter == nullptr) return(0); - return(m_datawriter->GetSubscriberCount()); + if (m_publisher_impl == nullptr) return 0; + return(m_publisher_impl->GetSubscriberCount()); } std::string CPublisher::GetTopicName() const { - if(m_datawriter == nullptr) return(""); - return(m_datawriter->GetTopicName()); + if (m_publisher_impl == nullptr) return ""; + return(m_publisher_impl->GetTopicName()); } - Registration::STopicId CPublisher::GetId() const + Registration::STopicId CPublisher::GetTopicId() const { - if (m_datawriter == nullptr) return{}; - return(m_datawriter->GetId()); + if (m_publisher_impl == nullptr) return Registration::STopicId(); + return(m_publisher_impl->GetTopicId()); } SDataTypeInformation CPublisher::GetDataTypeInformation() const { - if (m_datawriter == nullptr) return(SDataTypeInformation{}); - return(m_datawriter->GetDataTypeInformation()); - } - - std::string CPublisher::Dump(const std::string& indent_ /* = "" */) const - { - std::stringstream out; - - out << indent_ << "----------------------" << '\n'; - out << indent_ << " class CPublisher" << '\n'; - out << indent_ << "----------------------" << '\n'; - if((m_datawriter != nullptr) && m_datawriter->IsCreated()) out << indent_ << m_datawriter->Dump(" "); - out << '\n'; - - return(out.str()); + if (m_publisher_impl == nullptr) return SDataTypeInformation(); + return(m_publisher_impl->GetDataTypeInformation()); } -} \ No newline at end of file +} diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/pubsub/ecal_publisher_impl.cpp similarity index 78% rename from ecal/core/src/readwrite/ecal_writer.cpp rename to ecal/core/src/pubsub/ecal_publisher_impl.cpp index 9d6ceaca15..c1eb320020 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/pubsub/ecal_publisher_impl.cpp @@ -18,7 +18,7 @@ */ /** - * @brief common eCAL data writer + * @brief eCAL publisher implementation **/ #include @@ -30,15 +30,16 @@ #include "registration/ecal_registration_provider.h" #endif -#include "ecal_writer.h" -#include "ecal_writer_base.h" -#include "ecal_writer_buffer_payload.h" +#include "ecal_publisher_impl.h" #include "ecal_global_accessors.h" -#include "ecal_transport_layer.h" -#include "config/builder/shm_attribute_builder.h" -#include "config/builder/tcp_attribute_builder.h" -#include "config/builder/udp_attribute_builder.h" +#include "readwrite/ecal_writer_base.h" +#include "readwrite/ecal_writer_buffer_payload.h" +#include "readwrite/ecal_transport_layer.h" + +#include "readwrite/config/builder/shm_attribute_builder.h" +#include "readwrite/config/builder/tcp_attribute_builder.h" +#include "readwrite/config/builder/udp_attribute_builder.h" #include #include @@ -79,14 +80,14 @@ namespace } // function to log the states of SLayerState - void logLayerState(const std::string& layerName, const eCAL::CDataWriter::SLayerState& state) { + void logLayerState(const std::string& layerName, const eCAL::CPublisherImpl::SLayerState& state) { std::cout << layerName << " - Read Enabled: " << boolToString(state.read_enabled) << ", Write Enabled: " << boolToString(state.write_enabled) << ", Write Active : " << boolToString(state.active) << std::endl; } // function to log the states of SLayerStates - void logLayerStates(const eCAL::CDataWriter::SLayerStates& states) { + void logLayerStates(const eCAL::CPublisherImpl::SLayerStates& states) { std::cout << "Logging Layer States:" << std::endl; logLayerState("UDP", states.udp); logLayerState("SHM", states.shm); @@ -97,7 +98,7 @@ namespace namespace eCAL { - CDataWriter::CDataWriter(const SDataTypeInformation& topic_info_, const eCAL::eCALWriter::SAttributes& attr_) : + CPublisherImpl::CPublisherImpl(const SDataTypeInformation& topic_info_, const eCAL::eCALWriter::SAttributes& attr_) : m_topic_info(topic_info_), m_attributes(attr_), m_frequency_calculator(3.0f), @@ -105,7 +106,7 @@ namespace eCAL { #ifndef NDEBUG // log it - Logging::Log(log_level_debug1, m_attributes.topic_name + "::CDataWriter::Constructor"); + Logging::Log(log_level_debug2, m_attributes.topic_name + "::CDataWriter::Constructor"); #endif // build topic id @@ -117,36 +118,27 @@ namespace eCAL m_created = true; } - CDataWriter::~CDataWriter() + CPublisherImpl::~CPublisherImpl() { #ifndef NDEBUG // log it Logging::Log(log_level_debug1, m_attributes.topic_name + "::CDataWriter::Destructor"); #endif - Stop(); - } - - bool CDataWriter::Stop() - { - if (!m_created) return false; -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug1, m_attributes.topic_name + "::CDataWriter::Stop"); -#endif + if (!m_created) return; // stop all transport layer StopAllLayer(); // clear subscriber maps { - const std::lock_guard lock(m_connection_map_mtx); + const std::lock_guard lock(m_connection_map_mutex); m_connection_map.clear(); } // clear event callback map { - const std::lock_guard lock(m_event_callback_map_mtx); + const std::lock_guard lock(m_event_callback_map_mutex); m_event_callback_map.clear(); } @@ -155,11 +147,9 @@ namespace eCAL // and unregister Unregister(); - - return true; } - size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long filter_id_) + bool CPublisherImpl::Write(CPayloadWriter& payload_, long long time_, long long filter_id_) { // get payload buffer size (one time, to avoid multiple computations) const size_t payload_buf_size(payload_.GetSize()); @@ -350,11 +340,10 @@ namespace eCAL #endif // ECAL_CORE_TRANSPORT_TCP // return success - if (written) return payload_buf_size; - else return 0; + return written; } - bool CDataWriter::SetDataTypeInformation(const SDataTypeInformation& topic_info_) + bool CPublisherImpl::SetDataTypeInformation(const SDataTypeInformation& topic_info_) { m_topic_info = topic_info_; @@ -366,7 +355,7 @@ namespace eCAL return(true); } - bool CDataWriter::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) + bool CPublisherImpl::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) { m_attr[attr_name_] = attr_value_; @@ -378,7 +367,7 @@ namespace eCAL return(true); } - bool CDataWriter::ClearAttribute(const std::string& attr_name_) + bool CPublisherImpl::ClearAttribute(const std::string& attr_name_) { m_attr.erase(attr_name_); @@ -390,7 +379,7 @@ namespace eCAL return(true); } - bool CDataWriter::AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_) + bool CPublisherImpl::AddEventCallback(eCAL_Publisher_Event type_, const PubEventCallbackT callback_) { if (!m_created) return(false); @@ -400,14 +389,14 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_attributes.topic_name + "::CDataWriter::AddEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = std::move(callback_); + const std::lock_guard lock(m_event_callback_map_mutex); + m_event_callback_map[type_] = callback_; } return(true); } - bool CDataWriter::RemEventCallback(eCAL_Publisher_Event type_) + bool CPublisherImpl::RemoveEventCallback(eCAL_Publisher_Event type_) { if (!m_created) return(false); @@ -417,14 +406,30 @@ namespace eCAL // log it Logging::Log(log_level_debug2, m_attributes.topic_name + "::CDataWriter::RemEventCallback"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); + const std::lock_guard lock(m_event_callback_map_mutex); m_event_callback_map[type_] = nullptr; } return(true); } - void CDataWriter::ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_) + bool CPublisherImpl::AddEventIDCallback(const PubEventIDCallbackT callback_) + { + if (!m_created) return false; + const std::lock_guard lock(m_event_id_callback_mutex); + m_event_id_callback = callback_; + return true; + } + + bool CPublisherImpl::RemEventIDCallback() + { + if (!m_created) return false; + const std::lock_guard lock(m_event_id_callback_mutex); + m_event_id_callback = nullptr; + return true; + } + + void CPublisherImpl::ApplySubscriberRegistration(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_) { // collect layer states std::vector pub_layers; @@ -449,8 +454,6 @@ namespace eCAL #endif // determine if we need to start a transport layer - // if a new layer gets activated, we reregister for SHM and TCP to force the exchange of connection parameter - // without this forced registration we would need one additional registration loop for these two layers to establish the connection const TLayer::eTransportLayer layer2activate = DetermineTransportLayer2Start(pub_layers, sub_layers, m_attributes.host_name == subscription_info_.host_name); switch (layer2activate) { @@ -487,7 +490,7 @@ namespace eCAL bool is_new_connection = false; bool is_updated_connection = false; { - const std::lock_guard lock(m_connection_map_mtx); + const std::lock_guard lock(m_connection_map_mutex); auto subscription_info_iter = m_connection_map.find(subscription_info_); if (subscription_info_iter == m_connection_map.end()) @@ -525,12 +528,12 @@ namespace eCAL if (is_new_connection) { // fire connect event - FireConnectEvent(subscription_info_.entity_id, data_type_info_); + FireConnectEvent(subscription_info_, data_type_info_); } else if (is_updated_connection) { // fire update event - FireUpdateEvent(subscription_info_.entity_id, data_type_info_); + FireUpdateEvent(subscription_info_, data_type_info_); } #ifndef NDEBUG @@ -539,7 +542,7 @@ namespace eCAL #endif } - void CDataWriter::RemoveSubscription(const SSubscriptionInfo& subscription_info_) + void CPublisherImpl::ApplySubscriberUnregistration(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_) { // remove subscription #if ECAL_CORE_TRANSPORT_UDP @@ -552,23 +555,18 @@ namespace eCAL if (m_writer_tcp) m_writer_tcp->RemoveSubscription(subscription_info_.host_name, subscription_info_.process_id, subscription_info_.entity_id); #endif - // remove key from connection map - bool last_connection_gone(false); { - const std::lock_guard lock(m_connection_map_mtx); + const std::lock_guard lock(m_connection_map_mutex); + // remove key from connection map m_connection_map.erase(subscription_info_); - last_connection_gone = m_connection_map.empty(); // update connection count m_connection_count = GetConnectionCount(); } - if (last_connection_gone) - { - // fire disconnect event - FireDisconnectEvent(); - } + // fire disconnect event + FireDisconnectEvent(subscription_info_, data_type_info_); #ifndef NDEBUG // log it @@ -576,7 +574,7 @@ namespace eCAL #endif } - void CDataWriter::RefreshSendCounter() + void CPublisherImpl::RefreshSendCounter() { // increase write clock m_clock++; @@ -585,46 +583,22 @@ namespace eCAL { // we should think about if we would like to potentially use the `time_` variable to tick with (but we would need the same base for checking incoming samples then.... const auto send_time = std::chrono::steady_clock::now(); - const std::lock_guard lock(m_frequency_calculator_mtx); + const std::lock_guard lock(m_frequency_calculator_mutex); m_frequency_calculator.addTick(send_time); } } - bool CDataWriter::IsSubscribed() const + bool CPublisherImpl::IsSubscribed() const { return m_connection_count > 0; } - size_t CDataWriter::GetSubscriberCount() const + size_t CPublisherImpl::GetSubscriberCount() const { return m_connection_count; } - std::string CDataWriter::Dump(const std::string& indent_ /* = "" */) - { - std::stringstream out; - - out << '\n'; - out << indent_ << "--------------------------" << '\n'; - out << indent_ << " class CDataWriter " << '\n'; - out << indent_ << "--------------------------" << '\n'; - out << indent_ << "m_host_name: " << m_attributes.host_name << '\n'; - out << indent_ << "m_host_group_name: " << m_attributes.host_group_name << '\n'; - out << indent_ << "m_topic_name: " << m_attributes.topic_name << '\n'; - out << indent_ << "m_topic_id: " << m_topic_id << '\n'; - out << indent_ << "m_topic_info.encoding: " << m_topic_info.encoding << '\n'; - out << indent_ << "m_topic_info.name: " << m_topic_info.name << '\n'; - out << indent_ << "m_topic_info.desc: " << m_topic_info.descriptor << '\n'; - out << indent_ << "m_id: " << m_id << '\n'; - out << indent_ << "m_clock: " << m_clock << '\n'; - out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n'; - out << indent_ << "m_created: " << m_created << '\n'; - out << std::endl; - - return(out.str()); - } - - void CDataWriter::Register() + void CPublisherImpl::Register() { #if ECAL_CORE_REGISTRATION Registration::Sample registration_sample; @@ -638,7 +612,7 @@ namespace eCAL #endif // ECAL_CORE_REGISTRATION } - void CDataWriter::Unregister() + void CPublisherImpl::Unregister() { #if ECAL_CORE_REGISTRATION Registration::Sample unregistration_sample; @@ -652,12 +626,12 @@ namespace eCAL #endif // ECAL_CORE_REGISTRATION } - void CDataWriter::GetRegistration(Registration::Sample& sample) + void CPublisherImpl::GetRegistration(Registration::Sample& sample) { GetRegistrationSample(sample); } - void CDataWriter::GetRegistrationSample(Registration::Sample& ecal_reg_sample) + void CPublisherImpl::GetRegistrationSample(Registration::Sample& ecal_reg_sample) { ecal_reg_sample.cmd_type = bct_reg_publisher; @@ -737,7 +711,7 @@ namespace eCAL size_t loc_connections(0); size_t ext_connections(0); { - const std::lock_guard lock(m_connection_map_mtx); + const std::lock_guard lock(m_connection_map_mutex); for (const auto& sub : m_connection_map) { if (sub.first.host_name == m_attributes.host_name) @@ -751,7 +725,7 @@ namespace eCAL ecal_reg_sample_topic.connections_ext = static_cast(ext_connections); } - void CDataWriter::GetUnregistrationSample(Registration::Sample& ecal_unreg_sample) + void CPublisherImpl::GetUnregistrationSample(Registration::Sample& ecal_unreg_sample) { ecal_unreg_sample.cmd_type = bct_unreg_publisher; @@ -767,53 +741,55 @@ namespace eCAL ecal_reg_sample_topic.uname = m_attributes.unit_name; } - void CDataWriter::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) + void CPublisherImpl::FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) { - const std::lock_guard lock(m_event_callback_map_mtx); - auto iter = m_event_callback_map.find(pub_event_connected); - if (iter != m_event_callback_map.end() && iter->second) + SPubEventCallbackData data; + data.type = type_; + data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + data.clock = 0; + data.tid = subscription_info_.entity_id; + data.tdatatype = tinfo_; + + // new event handling with topic id + if(m_event_id_callback) { - SPubEventCallbackData data; - data.type = pub_event_connected; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - data.tid = tid_; - data.tdatatype = tinfo_; - (iter->second)(m_attributes.topic_name.c_str(), &data); + Registration::STopicId topic_id; + topic_id.topic_id.entity_id = subscription_info_.entity_id; + topic_id.topic_id.process_id = subscription_info_.process_id; + topic_id.topic_id.host_name = subscription_info_.host_name; + topic_id.topic_name = m_attributes.topic_name; + const std::lock_guard lock(m_event_id_callback_mutex); + // call event callback + m_event_id_callback(topic_id, data); } - } - void CDataWriter::FireUpdateEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) - { - const std::lock_guard lock(m_event_callback_map_mtx); - auto iter = m_event_callback_map.find(pub_event_update_connection); - if (iter != m_event_callback_map.end() && iter->second) + // deprecated event handling with topic name { - SPubEventCallbackData data; - data.type = pub_event_update_connection; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - data.tid = tid_; - data.tdatatype = tinfo_; - (iter->second)(m_attributes.topic_name.c_str(), &data); + const std::lock_guard lock(m_event_callback_map_mutex); + auto iter = m_event_callback_map.find(type_); + if (iter != m_event_callback_map.end() && iter->second) + { + (iter->second)(m_attributes.topic_name.c_str(), &data); + } } } - void CDataWriter::FireDisconnectEvent() + void CPublisherImpl::FireConnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) { - const std::lock_guard lock(m_event_callback_map_mtx); - auto iter = m_event_callback_map.find(pub_event_disconnected); - if (iter != m_event_callback_map.end() && iter->second) - { - SPubEventCallbackData data; - data.type = pub_event_disconnected; - data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - data.clock = 0; - (iter->second)(m_attributes.topic_name.c_str(), &data); - } + FireEvent(pub_event_connected, subscription_info_, tinfo_); + } + + void CPublisherImpl::FireUpdateEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) + { + FireEvent(pub_event_update_connection, subscription_info_, tinfo_); + } + + void CPublisherImpl::FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_) + { + FireEvent(pub_event_disconnected, subscription_info_, tinfo_); } - size_t CDataWriter::GetConnectionCount() + size_t CPublisherImpl::GetConnectionCount() { // no need to lock map here for now, map locked by caller size_t count(0); @@ -827,7 +803,7 @@ namespace eCAL return count; } - bool CDataWriter::StartUdpLayer() + bool CPublisherImpl::StartUdpLayer() { #if ECAL_CORE_TRANSPORT_UDP if (m_layers.udp.write_enabled) return false; @@ -853,7 +829,7 @@ namespace eCAL #endif // ECAL_CORE_TRANSPORT_UDP } - bool CDataWriter::StartShmLayer() + bool CPublisherImpl::StartShmLayer() { #if ECAL_CORE_TRANSPORT_SHM if (m_layers.shm.write_enabled) return false; @@ -879,7 +855,7 @@ namespace eCAL #endif // ECAL_CORE_TRANSPORT_SHM } - bool CDataWriter::StartTcpLayer() + bool CPublisherImpl::StartTcpLayer() { #if ECAL_CORE_TRANSPORT_TCP if (m_layers.tcp.write_enabled) return false; @@ -905,7 +881,7 @@ namespace eCAL #endif // ECAL_CORE_TRANSPORT_TCP } - void CDataWriter::StopAllLayer() + void CPublisherImpl::StopAllLayer() { #if ECAL_CORE_TRANSPORT_UDP // flag disabled @@ -932,7 +908,7 @@ namespace eCAL #endif } - size_t CDataWriter::PrepareWrite(long long id_, size_t len_) + size_t CPublisherImpl::PrepareWrite(long long id_, size_t len_) { // store id m_id = id_; @@ -951,7 +927,7 @@ namespace eCAL return snd_hash; } - TLayer::eTransportLayer CDataWriter::DetermineTransportLayer2Start(const std::vector& enabled_pub_layer_, const std::vector& enabled_sub_layer_, bool same_host_) + TLayer::eTransportLayer CPublisherImpl::DetermineTransportLayer2Start(const std::vector& enabled_pub_layer_, const std::vector& enabled_sub_layer_, bool same_host_) { // determine the priority list to use const Publisher::Configuration::LayerPriorityVector& layer_priority_vector = same_host_ ? m_attributes.layer_priority_local : m_attributes.layer_priority_remote; @@ -971,10 +947,10 @@ namespace eCAL return TLayer::eTransportLayer::tlayer_none; } - int32_t CDataWriter::GetFrequency() + int32_t CPublisherImpl::GetFrequency() { const auto frequency_time = std::chrono::steady_clock::now(); - const std::lock_guard lock(m_frequency_calculator_mtx); + const std::lock_guard lock(m_frequency_calculator_mutex); return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); } } diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/pubsub/ecal_publisher_impl.h similarity index 72% rename from ecal/core/src/readwrite/ecal_writer.h rename to ecal/core/src/pubsub/ecal_publisher_impl.h index 14fffc4175..509e980297 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/pubsub/ecal_publisher_impl.h @@ -18,7 +18,7 @@ */ /** - * @brief common eCAL data writer + * @brief eCAL publisher implementation **/ #pragma once @@ -30,18 +30,18 @@ #include "serialization/ecal_serialize_sample_registration.h" #include "util/frequency_calculator.h" -#include "config/attributes/writer_attributes.h" +#include "readwrite/config/attributes/writer_attributes.h" #if ECAL_CORE_TRANSPORT_UDP -#include "udp/ecal_writer_udp.h" +#include "readwrite/udp/ecal_writer_udp.h" #endif #if ECAL_CORE_TRANSPORT_SHM -#include "shm/ecal_writer_shm.h" +#include "readwrite/shm/ecal_writer_shm.h" #endif #if ECAL_CORE_TRANSPORT_TCP -#include "tcp/ecal_writer_tcp.h" +#include "readwrite/tcp/ecal_writer_tcp.h" #endif #include @@ -55,7 +55,7 @@ namespace eCAL { - class CDataWriter + class CPublisherImpl { public: struct SLayerState @@ -74,23 +74,26 @@ namespace eCAL using SSubscriptionInfo = Registration::SampleIdentifier; - CDataWriter(const SDataTypeInformation& topic_info_, const eCAL::eCALWriter::SAttributes& attr_); - ~CDataWriter(); + CPublisherImpl(const SDataTypeInformation& topic_info_, const eCAL::eCALWriter::SAttributes& attr_); + ~CPublisherImpl(); - bool Stop(); - - size_t Write(CPayloadWriter& payload_, long long time_, long long filter_id_); + bool Write(CPayloadWriter& payload_, long long time_, long long filter_id_); bool SetDataTypeInformation(const SDataTypeInformation& topic_info_); - bool AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_); - bool RemEventCallback(eCAL_Publisher_Event type_); + // deprecated event callback interface + bool AddEventCallback(eCAL_Publisher_Event type_, const PubEventCallbackT callback_); + bool RemoveEventCallback(eCAL_Publisher_Event type_); + + // future event callback interface + bool AddEventIDCallback(const PubEventIDCallbackT callback_); + bool RemEventIDCallback(); bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_); bool ClearAttribute(const std::string& attr_name_); - void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_); - void RemoveSubscription(const SSubscriptionInfo& subscription_info_); + void ApplySubscriberRegistration(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_); + void ApplySubscriberUnregistration(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_); void GetRegistration(Registration::Sample& sample); void RefreshSendCounter(); @@ -100,7 +103,7 @@ namespace eCAL bool IsSubscribed() const; size_t GetSubscriberCount() const; - Registration::STopicId GetId() const + Registration::STopicId GetTopicId() const { Registration::STopicId id; id.topic_name = m_attributes.topic_name; @@ -113,8 +116,6 @@ namespace eCAL const std::string& GetTopicName() const { return(m_attributes.topic_name); } const SDataTypeInformation& GetDataTypeInformation() const { return m_topic_info; } - std::string Dump(const std::string& indent_ = ""); - protected: void Register(); void Unregister(); @@ -128,9 +129,11 @@ namespace eCAL void StopAllLayer(); - void FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); - void FireUpdateEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); - void FireDisconnectEvent(); + void FireEvent(const eCAL_Publisher_Event type_, const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); + + void FireConnectEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); + void FireUpdateEvent (const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); + void FireDisconnectEvent(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& tinfo_); size_t GetConnectionCount(); @@ -155,18 +158,21 @@ namespace eCAL bool state = false; }; using SSubscriptionMapT = std::map; - mutable std::mutex m_connection_map_mtx; + mutable std::mutex m_connection_map_mutex; SSubscriptionMapT m_connection_map; std::atomic m_connection_count{ 0 }; using EventCallbackMapT = std::map; - std::mutex m_event_callback_map_mtx; + std::mutex m_event_callback_map_mutex; EventCallbackMapT m_event_callback_map; + std::mutex m_event_id_callback_mutex; + PubEventIDCallbackT m_event_id_callback; + long long m_id = 0; long long m_clock = 0; - std::mutex m_frequency_calculator_mtx; + std::mutex m_frequency_calculator_mutex; ResettableFrequencyCalculator m_frequency_calculator; #if ECAL_CORE_TRANSPORT_UDP diff --git a/ecal/core/src/pubsub/ecal_publisher_v5.cpp b/ecal/core/src/pubsub/ecal_publisher_v5.cpp new file mode 100644 index 0000000000..6727d70206 --- /dev/null +++ b/ecal/core/src/pubsub/ecal_publisher_v5.cpp @@ -0,0 +1,248 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +/** + * @brief eCAL publisher interface (deprecated eCAL5 version) +**/ + +#include +#include + +#include "ecal_globals.h" +#include "ecal_publisher_impl.h" +#include "ecal_config_internal.h" + +#include "config/builder/writer_attribute_builder.h" +#include "readwrite/ecal_writer_buffer_payload.h" + +#include +#include +#include +#include +#include + +namespace eCAL +{ + namespace v5 + { + CPublisher::CPublisher() : + m_publisher_impl(nullptr), + m_filter_id(0) + { + } + + CPublisher::CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_) + : CPublisher() + { + CPublisher::Create(topic_name_, data_type_info_, config_); + } + + CPublisher::CPublisher(const std::string& topic_name_, const Publisher::Configuration& config_) + : CPublisher(topic_name_, SDataTypeInformation{}, config_) + {} + + CPublisher::~CPublisher() + { + CPublisher::Destroy(); + } + + /** + * @brief CPublisher are move-enabled + **/ + CPublisher::CPublisher(CPublisher&& rhs) noexcept : + m_publisher_impl(std::move(rhs.m_publisher_impl)), + m_filter_id(rhs.m_filter_id) + { + rhs.m_publisher_impl = nullptr; + } + + /** + * @brief CPublisher are move-enabled + **/ + CPublisher& CPublisher::operator=(CPublisher&& rhs) noexcept + { + // Call destroy, to clean up the current state, then afterwards move all elements + Destroy(); + + m_publisher_impl = std::move(rhs.m_publisher_impl); + m_filter_id = rhs.m_filter_id; + + rhs.m_publisher_impl = nullptr; + + return *this; + } + + bool CPublisher::Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_) + { + if (m_publisher_impl != nullptr) return(false); + if (topic_name_.empty()) return(false); + + // create publisher + m_publisher_impl = std::make_shared(data_type_info_, BuildWriterAttributes(topic_name_, config_, eCAL::GetTransportLayerConfiguration(), eCAL::GetRegistrationConfiguration())); + + // register publisher + g_pubgate()->Register(topic_name_, m_publisher_impl); + + // we made it :-) + return(true); + } + + bool CPublisher::Create(const std::string& topic_name_) + { + return Create(topic_name_, SDataTypeInformation()); + } + + bool CPublisher::Destroy() + { + if (m_publisher_impl == nullptr) return(false); + + // unregister publisher + if(g_pubgate() != nullptr) g_pubgate()->Unregister(m_publisher_impl->GetTopicName(), m_publisher_impl); + #ifndef NDEBUG + // log it + eCAL::Logging::Log(log_level_debug1, std::string(m_publisher_impl->GetTopicName() + "::CPublisher::Destroy")); + #endif + + // destroy publisher + m_publisher_impl.reset(); + + // we made it :-) + return(true); + } + + bool CPublisher::SetDataTypeInformation(const SDataTypeInformation& data_type_info_) + { + if (m_publisher_impl == nullptr) return false; + return m_publisher_impl->SetDataTypeInformation(data_type_info_); + } + + bool CPublisher::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) + { + if(m_publisher_impl == nullptr) return false; + return m_publisher_impl->SetAttribute(attr_name_, attr_value_); + } + + bool CPublisher::ClearAttribute(const std::string& attr_name_) + { + if(m_publisher_impl == nullptr) return false; + return m_publisher_impl->ClearAttribute(attr_name_); + } + + bool CPublisher::SetID(long long filter_id_) + { + m_filter_id = filter_id_; + return true; + } + + size_t CPublisher::Send(const void* const buf_, const size_t len_, const long long time_ /* = DEFAULT_TIME_ARGUMENT */) + { + CBufferPayloadWriter payload{ buf_, len_ }; + return Send(payload, time_); + } + + size_t CPublisher::Send(CPayloadWriter& payload_, long long time_) + { + if (m_publisher_impl == nullptr) return 0; + + // in an optimization case the + // publisher can send an empty package + // or we do not have any subscription at all + // then the data writer will only do some statistics + // for the monitoring layer and return + if (!IsSubscribed()) + { + m_publisher_impl->RefreshSendCounter(); + return(payload_.GetSize()); + } + + // send content via data writer layer + const long long write_time = (time_ == DEFAULT_TIME_ARGUMENT) ? eCAL::Time::GetMicroSeconds() : time_; + const size_t written_bytes = m_publisher_impl->Write(payload_, write_time, m_filter_id); + + // return number of bytes written + return written_bytes; + } + + size_t CPublisher::Send(const std::string& s_, long long time_) + { + return(Send(s_.data(), s_.size(), time_)); + } + + bool CPublisher::AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_) + { + if (m_publisher_impl == nullptr) return(false); + RemEventCallback(type_); + return(m_publisher_impl->AddEventCallback(type_, std::move(callback_))); + } + + bool CPublisher::RemEventCallback(eCAL_Publisher_Event type_) + { + if (m_publisher_impl == nullptr) return(false); + return(m_publisher_impl->RemoveEventCallback(type_)); + } + + bool CPublisher::IsSubscribed() const + { + #if ECAL_CORE_REGISTRATION + if(m_publisher_impl == nullptr) return(false); + return(m_publisher_impl->IsSubscribed()); + #else // ECAL_CORE_REGISTRATION + return(true); + #endif // ECAL_CORE_REGISTRATION + } + + size_t CPublisher::GetSubscriberCount() const + { + if (m_publisher_impl == nullptr) return(0); + return(m_publisher_impl->GetSubscriberCount()); + } + + std::string CPublisher::GetTopicName() const + { + if(m_publisher_impl == nullptr) return(""); + return(m_publisher_impl->GetTopicName()); + } + + Registration::STopicId CPublisher::GetId() const + { + if (m_publisher_impl == nullptr) return{}; + return(m_publisher_impl->GetTopicId()); + } + + SDataTypeInformation CPublisher::GetDataTypeInformation() const + { + if (m_publisher_impl == nullptr) return(SDataTypeInformation{}); + return(m_publisher_impl->GetDataTypeInformation()); + } + + std::string CPublisher::Dump(const std::string& indent_ /* = "" */) const + { + std::stringstream out; + + out << indent_ << "----------------------" << '\n'; + out << indent_ << " class CPublisher" << '\n'; + out << indent_ << "----------------------" << '\n'; + out << indent_ << "DUMP NOT SUPPORTED ANYMORE" << '\n'; + out << indent_ << "----------------------" << '\n'; + out << '\n'; + + return(out.str()); + } + } +} diff --git a/ecal/core/src/registration/ecal_registration_sample_applier_gates.cpp b/ecal/core/src/registration/ecal_registration_sample_applier_gates.cpp index e761409695..7b64f47efa 100644 --- a/ecal/core/src/registration/ecal_registration_sample_applier_gates.cpp +++ b/ecal/core/src/registration/ecal_registration_sample_applier_gates.cpp @@ -51,10 +51,10 @@ namespace eCAL break; #if ECAL_CORE_PUBLISHER case bct_reg_subscriber: - if (g_pubgate() != nullptr) g_pubgate()->ApplySubRegistration(sample_); + if (g_pubgate() != nullptr) g_pubgate()->ApplySubscriberRegistration(sample_); break; case bct_unreg_subscriber: - if (g_pubgate() != nullptr) g_pubgate()->ApplySubUnregistration(sample_); + if (g_pubgate() != nullptr) g_pubgate()->ApplySubscriberUnregistration(sample_); break; #endif #if ECAL_CORE_SUBSCRIBER diff --git a/ecal/samples/cpp/benchmarks/counter_snd/src/counter_snd.cpp b/ecal/samples/cpp/benchmarks/counter_snd/src/counter_snd.cpp index 6297d09892..801e8acbe6 100644 --- a/ecal/samples/cpp/benchmarks/counter_snd/src/counter_snd.cpp +++ b/ecal/samples/cpp/benchmarks/counter_snd/src/counter_snd.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,9 +83,6 @@ int main(int argc, char **argv) } } - // destroy publisher - pub.Destroy(); - // finalize eCAL API eCAL::Finalize(); diff --git a/ecal/samples/cpp/benchmarks/datarate_snd/src/datarate_snd.cpp b/ecal/samples/cpp/benchmarks/datarate_snd/src/datarate_snd.cpp index df987b2228..c997e9b638 100644 --- a/ecal/samples/cpp/benchmarks/datarate_snd/src/datarate_snd.cpp +++ b/ecal/samples/cpp/benchmarks/datarate_snd/src/datarate_snd.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,7 +74,7 @@ int main(int argc, char **argv) pub_config.layer.shm.acknowledge_timeout_ms = acknowledge_time; // new publisher - eCAL::CPublisher pub(topic_name, pub_config); + eCAL::CPublisher pub(topic_name, eCAL::SDataTypeInformation(), pub_config); // default send string size *= 1024 * 1024; @@ -94,9 +94,6 @@ int main(int argc, char **argv) if(sleep > 0) std::this_thread::sleep_for(std::chrono::milliseconds(sleep)); } - // destroy publisher - pub.Destroy(); - // finalize eCAL API eCAL::Finalize(); diff --git a/ecal/samples/cpp/benchmarks/latency_snd/src/latency_snd.cpp b/ecal/samples/cpp/benchmarks/latency_snd/src/latency_snd.cpp index 33f25c277e..f358dddaf9 100644 --- a/ecal/samples/cpp/benchmarks/latency_snd/src/latency_snd.cpp +++ b/ecal/samples/cpp/benchmarks/latency_snd/src/latency_snd.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ void do_run(const int runs, int snd_size /*kB*/, int mem_buffer, bool zero_copy) pub_config.layer.shm.acknowledge_timeout_ms = 100; // create publisher - eCAL::CPublisher pub("ping", pub_config); + eCAL::CPublisher pub("ping", eCAL::SDataTypeInformation(), pub_config); // prepare send buffer CBinaryPayload payload(snd_size * 1024); diff --git a/ecal/samples/cpp/benchmarks/multiple_snd/src/multiple_snd.cpp b/ecal/samples/cpp/benchmarks/multiple_snd/src/multiple_snd.cpp index 4a33e06548..127598235e 100644 --- a/ecal/samples/cpp/benchmarks/multiple_snd/src/multiple_snd.cpp +++ b/ecal/samples/cpp/benchmarks/multiple_snd/src/multiple_snd.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,9 +64,7 @@ int main() tname << "PUB_" << i; // publisher topic name - std::shared_ptr pub = std::make_shared(); - - pub->Create(tname.str()); + std::shared_ptr pub = std::make_shared(tname.str()); struct SPubCount pub_count; pub_count.pub = std::move(pub); pub_vec[i] = pub_count; diff --git a/ecal/samples/cpp/benchmarks/performance_snd/src/performance_snd.cpp b/ecal/samples/cpp/benchmarks/performance_snd/src/performance_snd.cpp index 48a2c65bea..e848cdc47e 100644 --- a/ecal/samples/cpp/benchmarks/performance_snd/src/performance_snd.cpp +++ b/ecal/samples/cpp/benchmarks/performance_snd/src/performance_snd.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,7 +62,7 @@ int main(int argc, char **argv) std::cout << std::endl; // create publisher - eCAL::CPublisher pub("Performance", pub_config); + eCAL::CPublisher pub("Performance", eCAL::SDataTypeInformation(), pub_config); // counter long long msgs (0); @@ -110,9 +110,6 @@ int main(int argc, char **argv) } } - // destroy publisher - pub.Destroy(); - // finalize eCAL API eCAL::Finalize(); diff --git a/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp b/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp index 4d5ab63208..2fd5250c7a 100644 --- a/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp +++ b/ecal/samples/cpp/benchmarks/pubsub_throughput/src/pubsub_throughput.cpp @@ -62,7 +62,7 @@ void throughput_test(int snd_size, int snd_loops, eCAL::TLayer::eTransportLayer pub_config.layer.shm.acknowledge_timeout_ms = 100; // create publisher - eCAL::CPublisher pub("throughput", pub_config); + eCAL::CPublisher pub("throughput", eCAL::SDataTypeInformation(), pub_config); // create subscriber eCAL::CSubscriber sub("throughput"); diff --git a/ecal/samples/cpp/pubsub/binary/binary_snd/src/binary_snd.cpp b/ecal/samples/cpp/pubsub/binary/binary_snd/src/binary_snd.cpp index af9bcb57c9..206bd33a62 100644 --- a/ecal/samples/cpp/pubsub/binary/binary_snd/src/binary_snd.cpp +++ b/ecal/samples/cpp/pubsub/binary/binary_snd/src/binary_snd.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/tests/cpp/pubsub_proto_test/src/proto_publisher_test.cpp b/ecal/tests/cpp/pubsub_proto_test/src/proto_publisher_test.cpp index fe351029db..a186107ac0 100644 --- a/ecal/tests/cpp/pubsub_proto_test/src/proto_publisher_test.cpp +++ b/ecal/tests/cpp/pubsub_proto_test/src/proto_publisher_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/ecal/tests/cpp/pubsub_proto_test/src/proto_subscriber_test.cpp b/ecal/tests/cpp/pubsub_proto_test/src/proto_subscriber_test.cpp index 50032dc0ae..3c2f937719 100644 --- a/ecal/tests/cpp/pubsub_proto_test/src/proto_subscriber_test.cpp +++ b/ecal/tests/cpp/pubsub_proto_test/src/proto_subscriber_test.cpp @@ -47,7 +47,7 @@ class ProtoSubscriberTest : public ::testing::Test { eCAL::Finalize(); } - size_t SendPerson(eCAL::protobuf::CPublisher& pub) + bool SendPerson(eCAL::protobuf::CPublisher& pub) { p.set_id(1); p.set_name("Max"); @@ -87,11 +87,10 @@ TEST_F(core_cpp_pubsub_proto_sub, ProtoSubscriberTest_SendReceive) std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - auto bytes_send = SendPerson(person_pub); + ASSERT_TRUE(SendPerson(person_pub)); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // assert that the OnPerson callback has been called once. ASSERT_EQ(1, received_callbacks); - ASSERT_EQ(bytes_send, GetPersonSize()); } TEST_F(core_cpp_pubsub_proto_sub, ProtoSubscriberTest_MoveAssignment) @@ -119,7 +118,7 @@ TEST_F(core_cpp_pubsub_proto_sub, ProtoSubscriberTest_MoveAssignment) std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - SendPerson(person_pub); + ASSERT_TRUE(SendPerson(person_pub)); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // assert that the OnPerson callback has been called once. ASSERT_EQ(1, received_callbacks); @@ -145,7 +144,7 @@ TEST_F(core_cpp_pubsub_proto_sub, ProtoSubscriberTest_MoveConstruction) std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - SendPerson(person_pub); + ASSERT_TRUE(SendPerson(person_pub)); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // assert that the OnPerson callback has been called once. ASSERT_EQ(1, received_callbacks); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_callback_topicid.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_callback_topicid.cpp index 94d8ec26b3..821268f33f 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_callback_topicid.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_callback_topicid.cpp @@ -128,7 +128,7 @@ TEST_P(TestFixture, OnePubSub) callback_datatype_info = datatype_info_; } ); - const auto pub_id = publisher.GetId(); + const auto pub_id = publisher.GetTopicId(); // let them match eCAL::Process::SleepMS(2 * config.registration.registration_refresh); @@ -175,7 +175,7 @@ TEST_P(TestFixture, MultiplePubSub) for (int i = 0; i < num_publishers; ++i) { auto& publisher = publishers[i]; - const auto pub_id = publisher.GetId(); + const auto pub_id = publisher.GetTopicId(); const auto pub_datatype_info = publisher.GetDataTypeInformation(); // send data diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp index 357639ab06..c28f1159a2 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_connection_test.cpp @@ -41,7 +41,7 @@ TEST(core_cpp_pubsub, TestSubscriberIsPublishedTiming) auto publisher_function = [&do_start_publication, &publication_finished, &subscriber_seen_at_publication_start]() { eCAL::Publisher::Configuration pub_config; pub_config.layer.shm.acknowledge_timeout_ms = 500; - eCAL::CPublisher pub("blob", pub_config); + eCAL::CPublisher pub("blob", eCAL::SDataTypeInformation(), pub_config); int pub_count(0); const auto max_pub_count(1000); @@ -51,7 +51,7 @@ TEST(core_cpp_pubsub, TestSubscriberIsPublishedTiming) { if (pub_count == 0) { - subscriber_seen_at_publication_start = pub.IsSubscribed(); + subscriber_seen_at_publication_start = pub.GetSubscriberCount() > 0; } pub.Send(std::to_string(pub_count)); @@ -140,13 +140,13 @@ TEST(core_cpp_pubsub, TestPublisherIsSubscribedTiming) auto publisher_function = [&do_start_publication, &publication_finished]() { eCAL::Publisher::Configuration pub_config; pub_config.layer.shm.acknowledge_timeout_ms = 500; - eCAL::CPublisher pub("blob", pub_config); + eCAL::CPublisher pub("blob", eCAL::SDataTypeInformation(), pub_config); int cnt(0); const auto max_runs(1000); while (eCAL::Ok()) { - if (pub.IsSubscribed()) + if (pub.GetSubscriberCount() > 0) { do_start_publication = true; } @@ -236,7 +236,7 @@ TEST(core_cpp_pubsub, TestChainedPublisherSubscriberCallback) // Publisher1 in thread 1 auto publisher1_function = [&publisher1_sent_count, &message_count]() { eCAL::CPublisher pub1("topic1"); - while (!pub1.IsSubscribed()) + while (pub1.GetSubscriberCount() == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp index 826de1c97e..cd04ac99af 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp @@ -91,7 +91,7 @@ std::vector multibuffer_pub_sub_test(int buffer_count, bool zero_copy, int pub_config.layer.shm.memfile_buffer_count = buffer_count; // create publisher for topic "A" - eCAL::CPublisher pub("A", pub_config); + eCAL::CPublisher pub("A", eCAL::SDataTypeInformation(), pub_config); std::atomic received_count{ 0 }; std::atomic received_bytes{ 0 }; @@ -117,7 +117,7 @@ std::vector multibuffer_pub_sub_test(int buffer_count, bool zero_copy, int // run publications for (int i = 0; i < publications; ++i) { - EXPECT_EQ(PAYLOAD_SIZE_BYTE, pub.Send(binary_payload)); + EXPECT_EQ(true, pub.Send(binary_payload)); eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); } diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp index 44f7b9474d..dd8d66f06f 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp @@ -161,45 +161,6 @@ TEST(core_cpp_pubsub, CallbackDestruction) EXPECT_EQ(0, eCAL::Finalize()); } -TEST(core_cpp_pubsub, CreateDestroy) -{ - // initialize eCAL API - eCAL::Initialize("pubsub_test"); - - // create publisher for topic "foo" - eCAL::CPublisher pub; - - // check state - EXPECT_EQ(false, pub.IsCreated()); - - // create - EXPECT_EQ(true, pub.Create("foo")); - - // check state - EXPECT_EQ(true, pub.IsCreated()); - - // create subscriber for topic "foo" - eCAL::CSubscriber sub; - - // check state - EXPECT_EQ(false, sub.IsCreated()); - - // create - EXPECT_EQ(true, sub.Create("foo")); - - // check state - EXPECT_EQ(true, sub.IsCreated()); - - // destroy publisher - EXPECT_EQ(true, pub.Destroy()); - - // destroy subscriber - EXPECT_EQ(true, sub.Destroy()); - - // finalize eCAL API - eCAL::Finalize(); -} - TEST(core_cpp_pubsub, SimpleMessage1) { // default send / receive strings @@ -219,7 +180,7 @@ TEST(core_cpp_pubsub, SimpleMessage1) eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // send content - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // receive content with DATA_FLOW_TIME_MS timeout recv_s.clear(); @@ -232,9 +193,6 @@ TEST(core_cpp_pubsub, SimpleMessage1) EXPECT_EQ(false, sub.ReceiveBuffer(recv_s, nullptr, DATA_FLOW_TIME_MS)); EXPECT_EQ(0, recv_s.size()); - // destroy publisher - pub.Destroy(); - // destroy subscriber sub.Destroy(); @@ -261,16 +219,13 @@ TEST(core_cpp_pubsub, SimpleMessage2) eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); // send content - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // receive content with DATA_FLOW_TIME_MS timeout recv_s.clear(); EXPECT_EQ(true, sub.ReceiveBuffer(recv_s, nullptr, DATA_FLOW_TIME_MS)); EXPECT_EQ(send_s.size(), recv_s.size()); - // destroy publisher - pub.Destroy(); - // destroy subscriber sub.Destroy(); @@ -300,7 +255,7 @@ TEST(core_cpp_pubsub, SimpleMessageCB) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -313,7 +268,7 @@ TEST(core_cpp_pubsub, SimpleMessageCB) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -326,7 +281,7 @@ TEST(core_cpp_pubsub, SimpleMessageCB) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -339,7 +294,7 @@ TEST(core_cpp_pubsub, SimpleMessageCB) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -347,9 +302,6 @@ TEST(core_cpp_pubsub, SimpleMessageCB) // check callback receive EXPECT_EQ(0, g_callback_received_bytes); - // destroy publisher - pub.Destroy(); - // finalize eCAL API eCAL::Finalize(); } @@ -376,7 +328,7 @@ TEST(core_cpp_pubsub, DynamicSizeCB) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -389,7 +341,7 @@ TEST(core_cpp_pubsub, DynamicSizeCB) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_EQ(true, pub.Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -397,9 +349,6 @@ TEST(core_cpp_pubsub, DynamicSizeCB) // check callback receive EXPECT_EQ(send_s.size(), g_callback_received_bytes); - // destroy publisher - pub.Destroy(); - // destroy subscriber sub.Destroy(); @@ -431,7 +380,7 @@ TEST(core_cpp_pubsub, DynamicCreate) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub->Send(send_s)); + EXPECT_EQ(true, pub->Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -454,7 +403,7 @@ TEST(core_cpp_pubsub, DynamicCreate) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub->Send(send_s)); + EXPECT_EQ(true, pub->Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); @@ -476,7 +425,7 @@ TEST(core_cpp_pubsub, DynamicCreate) // send content g_callback_received_bytes = 0; - EXPECT_EQ(send_s.size(), pub->Send(send_s)); + EXPECT_EQ(true, pub->Send(send_s)); // let the data flow eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp index 6e573ad8f2..176acd6ac9 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test_shm.cpp @@ -65,11 +65,11 @@ TEST(core_cpp_pubsub, ZeroPayloadMessageSHM) pub_config.layer.tcp.enable = false; // create publisher for topic "A" (no zero copy) - eCAL::CPublisher pub1("A", pub_config); + eCAL::CPublisher pub1("A", eCAL::SDataTypeInformation(), pub_config); // switch on zero copy pub_config.layer.shm.zero_copy_mode = true; - eCAL::CPublisher pub2("A", pub_config); + eCAL::CPublisher pub2("A", eCAL::SDataTypeInformation(), pub_config); // add callback @@ -82,17 +82,17 @@ TEST(core_cpp_pubsub, ZeroPayloadMessageSHM) g_callback_received_count = 0; // send without zero copy - EXPECT_EQ(send_s.size(), pub1.Send(send_s)); + EXPECT_TRUE(pub1.Send(send_s)); eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); - EXPECT_EQ(send_s.size(), pub1.Send(nullptr, 0)); + EXPECT_TRUE(pub1.Send(nullptr, 0)); eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); // send with zero copy - EXPECT_EQ(send_s.size(), pub2.Send(send_s)); + EXPECT_TRUE(pub2.Send(send_s)); eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); - EXPECT_EQ(send_s.size(), pub2.Send(nullptr, 0)); + EXPECT_TRUE(pub2.Send(nullptr, 0)); eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); // check callback receive @@ -102,10 +102,6 @@ TEST(core_cpp_pubsub, ZeroPayloadMessageSHM) // destroy subscriber sub.Destroy(); - // destroy publisher - pub1.Destroy(); - pub2.Destroy(); - // finalize eCAL API eCAL::Finalize(); } diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp index ea4b4d7d2e..d6784c1b45 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_test_udp.cpp @@ -65,7 +65,7 @@ TEST(core_cpp_pubsub, ZeroPayloadMessageUDP) pub_config.layer.tcp.enable = false; // create publisher for topic "A" - eCAL::CPublisher pub("A", pub_config); + eCAL::CPublisher pub("A", eCAL::SDataTypeInformation(), pub_config); // add callback EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2))); @@ -76,22 +76,16 @@ TEST(core_cpp_pubsub, ZeroPayloadMessageUDP) g_callback_received_bytes = 0; g_callback_received_count = 0; - EXPECT_EQ(send_s.size(), pub.Send(send_s)); + EXPECT_TRUE(pub.Send(send_s)); eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); - EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0)); + EXPECT_TRUE(pub.Send(nullptr, 0)); eCAL::Process::SleepMS(DATA_FLOW_TIME_MS); // check callback receive EXPECT_EQ(send_s.size(), g_callback_received_bytes); EXPECT_EQ(2, g_callback_received_count); - // destroy subscriber - sub.Destroy(); - - // destroy publisher - pub.Destroy(); - // finalize eCAL API eCAL::Finalize(); } @@ -139,12 +133,6 @@ TEST(core_cpp_pubsub, MultipleSendsUDP) ++timestamp; } - // destroy subscriber - sub.Destroy(); - - // destroy publisher - pub.Destroy(); - // finalize eCAL API eCAL::Finalize(); } diff --git a/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp b/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp index bb4d4869e7..aad32a42e2 100644 --- a/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp +++ b/ecal/tests/cpp/registration_test_public/src/registration_gettopics.cpp @@ -52,13 +52,13 @@ TEST(core_cpp_registration_public, GetTopics) eCAL::SDataTypeInformation info_B2 { "typeB2" ,"", "descB2" }; // create 3 publisher - eCAL::CPublisher pub1("A1", info_A1); - eCAL::CPublisher pub2("A2", info_A2); - eCAL::CPublisher pub3("A3", info_A3); + auto pub1 = std::make_shared("A1", info_A1); + auto pub2 = std::make_shared("A2", info_A2); + auto pub3 = std::make_shared("A3", info_A3); // create 2 subscriber - eCAL::CSubscriber sub1("B1", info_B1); - eCAL::CSubscriber sub2("B2", info_B2); + auto sub1 = std::make_shared("B1", info_B1); + auto sub2 = std::make_shared("B2", info_B2); // let's register eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); @@ -103,8 +103,8 @@ TEST(core_cpp_registration_public, GetTopics) // now destroy publisher pub1 and subscriber sub1 // the entities pub12 and sub12 should replace them // by overwriting their type names and descriptions - pub1.Destroy(); - sub1.Destroy(); + pub1.reset(); + sub1.reset(); // let's register eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH_MS); @@ -133,7 +133,7 @@ TEST(core_cpp_registration_public, GetTopics) eCAL::Finalize(); } -// This test creates a reall big number of publishers. +// This test creates a real big number of publishers. // It then checks, if they have all been seen using GetTopics() // And the count is back to 0 upon completion. TEST(core_cpp_registration_public, GetTopicsParallel) diff --git a/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp b/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp index 6e520b3df1..1fcb38b854 100644 --- a/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp +++ b/lang/csharp/Continental.eCAL.Core/ecal_clr.cpp @@ -113,7 +113,7 @@ void Logger::Log(System::String^ message_) ///////////////////////////////////////////////////////////////////////////// // Publisher ///////////////////////////////////////////////////////////////////////////// -Publisher::Publisher() : m_pub(new ::eCAL::CPublisher()) +Publisher::Publisher() : m_pub(new ::eCAL::v5::CPublisher()) { } @@ -123,7 +123,7 @@ Publisher::Publisher(System::String^ topic_name_, System::String^ topic_type_, S topic_info.name = StringToStlString(topic_type_); topic_info.encoding = StringToStlString(topic_encoding_); topic_info.descriptor = StringToStlString(topic_desc_); - m_pub = new ::eCAL::CPublisher(StringToStlString(topic_name_), topic_info); + m_pub = new ::eCAL::v5::CPublisher(StringToStlString(topic_name_), topic_info); } Publisher::Publisher(System::String^ topic_name_, System::String^ topic_type_, System::String^ topic_encoding_, array^ topic_desc_) @@ -132,7 +132,7 @@ Publisher::Publisher(System::String^ topic_name_, System::String^ topic_type_, S topic_info.name = StringToStlString(topic_type_); topic_info.encoding = StringToStlString(topic_encoding_); topic_info.descriptor = ByteArrayToStlString(topic_desc_); - m_pub = new ::eCAL::CPublisher(StringToStlString(topic_name_), topic_info); + m_pub = new ::eCAL::v5::CPublisher(StringToStlString(topic_name_), topic_info); } Publisher::~Publisher() diff --git a/lang/csharp/Continental.eCAL.Core/ecal_clr.h b/lang/csharp/Continental.eCAL.Core/ecal_clr.h index b1f97ab2b5..9856f2a113 100644 --- a/lang/csharp/Continental.eCAL.Core/ecal_clr.h +++ b/lang/csharp/Continental.eCAL.Core/ecal_clr.h @@ -26,7 +26,7 @@ #include #include #include - +#include using namespace System; using namespace System::Collections::Generic; using namespace System::Runtime::InteropServices; @@ -209,7 +209,7 @@ namespace Continental System::String^ Dump(); private: - ::eCAL::CPublisher* m_pub; + ::eCAL::v5::CPublisher* m_pub; }; diff --git a/lang/python/core/src/ecal_clang.cpp b/lang/python/core/src/ecal_clang.cpp index 10b4d014dc..68640d1f5e 100644 --- a/lang/python/core/src/ecal_clang.cpp +++ b/lang/python/core/src/ecal_clang.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include "ecal_clang.h" @@ -293,7 +294,7 @@ ECAL_HANDLE pub_create(const char* topic_name_, const char* topic_type_, const c topic_info.encoding = topic_enc_; topic_info.descriptor = std::string(topic_desc_, static_cast(topic_desc_length_)); - auto* pub = new eCAL::CPublisher; + auto* pub = new eCAL::v5::CPublisher; if (!pub->Create(topic_name_, topic_info)) { delete pub; @@ -307,7 +308,7 @@ ECAL_HANDLE pub_create(const char* topic_name_, const char* topic_type_, const c /****************************************/ bool pub_destroy(ECAL_HANDLE handle_) { - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); if(pub != nullptr) { delete pub; @@ -324,7 +325,7 @@ bool pub_destroy(ECAL_HANDLE handle_) /****************************************/ int pub_send(ECAL_HANDLE handle_, const char* payload_, const int length_, const long long time_) { - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); if(pub != nullptr) { const size_t ret = pub->Send(payload_, static_cast(length_), time_); @@ -356,7 +357,7 @@ static void g_pub_event_callback(const char* topic_name_, const struct eCAL::SPu bool pub_add_event_callback(ECAL_HANDLE handle_, enum eCAL_Publisher_Event type_, const PubEventCallbackCT callback_, void* par_) { - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); auto callback = std::bind(g_pub_event_callback, std::placeholders::_1, std::placeholders::_2, callback_, par_); return(pub->AddEventCallback(type_, callback)); @@ -367,7 +368,7 @@ bool pub_add_event_callback(ECAL_HANDLE handle_, enum eCAL_Publisher_Event type_ /****************************************/ bool pub_rem_event_callback(ECAL_HANDLE handle_, enum eCAL_Publisher_Event type_) { - auto* pub = static_cast(handle_); + auto* pub = static_cast(handle_); return(pub->RemEventCallback(type_)); }