Skip to content

Commit

Permalink
[core] new publisher config API (#1561)
Browse files Browse the repository at this point in the history
* new publisher config API CPublisher::Configuration
* slightly refactored CWriter(UDP,SHM,TCP) design
* new pub/sub transport layer match logic driven by subscriber prepared
  • Loading branch information
rex-schilasky authored May 16, 2024
1 parent 1e63d97 commit f3312fa
Show file tree
Hide file tree
Showing 82 changed files with 2,033 additions and 1,813 deletions.
10 changes: 6 additions & 4 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ endif()
if(ECAL_CORE_PUBLISHER)
set(ecal_pub_src
src/pubsub/ecal_publisher.cpp
src/pubsub/ecal_publisher_config.cpp
src/pubsub/ecal_pubgate.cpp
src/pubsub/ecal_pubgate.h
)
Expand Down Expand Up @@ -222,8 +223,8 @@ if(ECAL_CORE_PUBLISHER)
)
if(ECAL_CORE_TRANSPORT_UDP)
list(APPEND ecal_writer_src
src/readwrite/udp/ecal_writer_udp_mc.cpp
src/readwrite/udp/ecal_writer_udp_mc.h
src/readwrite/udp/ecal_writer_udp.cpp
src/readwrite/udp/ecal_writer_udp.h
)
endif()
if(ECAL_CORE_TRANSPORT_TCP)
Expand All @@ -249,8 +250,8 @@ if(ECAL_CORE_SUBSCRIBER)
)
if(ECAL_CORE_TRANSPORT_UDP)
list(APPEND ecal_reader_src
src/readwrite/udp/ecal_reader_udp_mc.cpp
src/readwrite/udp/ecal_reader_udp_mc.h
src/readwrite/udp/ecal_reader_udp.cpp
src/readwrite/udp/ecal_reader_udp.h
)
endif()
if(ECAL_CORE_TRANSPORT_TCP)
Expand Down Expand Up @@ -463,6 +464,7 @@ set(ecal_header_cmn
include/ecal/ecal_process.h
include/ecal/ecal_process_severity.h
include/ecal/ecal_publisher.h
include/ecal/ecal_publisher_config.h
include/ecal/ecal_server.h
include/ecal/ecal_service_info.h
include/ecal/ecal_subscriber.h
Expand Down
4 changes: 1 addition & 3 deletions ecal/core/include/ecal/cimpl/ecal_core_cimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,9 @@ extern "C"
/**
* @brief Finalize eCAL API.
*
* @param components_ Defines which component to initialize (not yet supported).
*
* @return Zero if succeeded, 1 if already finalized, -1 if failed.
**/
ECALC_API int eCAL_Finalize(unsigned int components_);
ECALC_API int eCAL_Finalize();

/**
* @brief Check eCAL initialize state.
Expand Down
20 changes: 0 additions & 20 deletions ecal/core/include/ecal/cimpl/ecal_publisher_cimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,6 @@ extern "C"
**/
ECALC_API int eCAL_Pub_ClearAttribute(ECAL_HANDLE handle_, const char* attr_name_, int attr_name_len_);

/**
* @brief Share topic type.
*
* @param handle_ Publisher handle.
* @param state_ Set type share mode (none zero == share type).
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Pub_ShareType(ECAL_HANDLE handle_, int state_);

/**
* @brief Share topic description.
*
* @param handle_ Publisher handle.
* @param state_ Set description share mode (none zero == share description).
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Pub_ShareDescription(ECAL_HANDLE handle_, int state_);

/**
* @brief Set the specific topic id.
*
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/ecal_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,6 @@ namespace eCAL

protected:
std::shared_ptr<eCAL::CServiceClientImpl> m_service_client_impl;
bool m_created;
bool m_created;
};
}
4 changes: 1 addition & 3 deletions ecal/core/include/ecal/ecal_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ namespace eCAL
/**
* @brief Finalize eCAL API.
*
* @param components_ Defines which component to finalize (not yet supported).
*
* @return Zero if succeeded, 1 if already finalized, -1 if failed.
**/
ECAL_API int Finalize(unsigned int components_ = Init::Default);
ECAL_API int Finalize();

/**
* @brief Check eCAL initialize state.
Expand Down
48 changes: 16 additions & 32 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <ecal/ecal_deprecate.h>
#include <ecal/ecal_os.h>
#include <ecal/ecal_payload_writer.h>
#include <ecal/ecal_publisher_config.h>
#include <ecal/ecal_types.h>

#include <chrono>
Expand Down Expand Up @@ -69,7 +70,6 @@ namespace eCAL
class CPublisher
{
public:

ECAL_API static constexpr long long DEFAULT_TIME_ARGUMENT = -1; /*!< Use DEFAULT_TIME_ARGUMENT in the `Send()` function to let eCAL determine the send timestamp */

/**
Expand All @@ -80,17 +80,19 @@ namespace eCAL
/**
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param topic_info_ Topic information (encoding, type, descriptor)
* @param topic_name_ Unique topic name.
* @param data_type_info_ Topic data type information (encoding, type, descriptor).
* @param config_ Optional configuration parameters.
**/
ECAL_API CPublisher(const std::string& topic_name_, const SDataTypeInformation& topic_info_);
ECAL_API CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = {});

/**
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param config_ Optional configuration parameters.
**/
ECAL_API explicit CPublisher(const std::string& topic_name_);
ECAL_API explicit CPublisher(const std::string& topic_name_, const Publisher::Configuration& config_ = {});

/**
* @brief Destructor.
Expand Down Expand Up @@ -120,12 +122,13 @@ namespace eCAL
/**
* @brief Creates this object.
*
* @param topic_name_ Unique topic name.
* @param topic_info_ Topic information (encoding, type, descriptor)
* @param topic_name_ Unique topic name.
* @param data_type_info_ Topic data type information (encoding, type, descriptor).
* @param config_ Optional configuration parameters.
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_);
ECAL_API bool Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = {});

/**
* @brief Creates this object.
Expand All @@ -146,11 +149,11 @@ namespace eCAL
/**
* @brief Setup topic information.
*
* @param topic_info_ Topic information attributes.
* @param data_type_info_ Topic data type information attributes.
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool SetDataTypeInformation(const SDataTypeInformation& topic_info_);
ECAL_API bool SetDataTypeInformation(const SDataTypeInformation& data_type_info_);

/**
* @brief Sets publisher attribute.
Expand All @@ -172,24 +175,6 @@ namespace eCAL
**/
ECAL_API bool ClearAttribute(const std::string& attr_name_);

/**
* @brief Share topic type.
*
* @param state_ Set type share mode (true == share type).
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool ShareType(bool state_ = true);

/**
* @brief Share topic description.
*
* @param state_ Set description share mode (true == share description).
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool ShareDescription(bool state_ = true);

/**
* @brief Set the specific topic id.
*
Expand Down Expand Up @@ -295,9 +280,8 @@ namespace eCAL

protected:
// class members
std::shared_ptr<CDataWriter> m_datawriter;
long long m_id;
bool m_created;
bool m_initialized;
std::shared_ptr<CDataWriter> m_datawriter;
long long m_id;
bool m_created;
};
}
143 changes: 143 additions & 0 deletions ecal/core/include/ecal/ecal_publisher_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

/**
* @file ecal_publisher_config.h
* @brief eCAL publisher configuration
*
* This publisher configuration struct can be used to define the behavior of an eCAL publisher. Additional information on
* selected configuration parameters:
*
* --------------------------------------------------------------------------------------------------------------
* Zero copy shared memory transport mode (SHM::Configuration::zero_copy_mode)
* --------------------------------------------------------------------------------------------------------------
*
* By default, the built-in shared memory layer is configured to make two memory copies
* one on the publisher and one on the subscriber side.
*
* The intention of this implementation is to free the file as fast as possible after writing and reading
* its content to allow other processes to access the content with minimal latency. The publisher and subscribers
* are fully decoupled and can access their internal memory copy independently.
*
* If the zero copy mode is switched on no memory will be copied at all using the low level binary publish / subscribe API.
* On publisher side the memory copy is exectuted into the opened memory file. On the subscriber side the user message
* callback is called right after opening the memory file. A direct pointer to the memory payload is forwarded
* and can be processed with no latency. The memory file will be closed after the user callback function
* returned.
*
* The advantage of this configuration is a much higher performance for large payloads (> 1024 kB).
* The disadvantage of this configuration is that in the time when the callback is executed the memory file
* is blocked for other subscribers and for writing publishers too. Maybe this can be eliminated
* by a better memory file read/write access implementation (lock free read) in future releases.
*
* Today, for specific scenarios (1:1 pub/sub connections with large payloads for example) this feature
* can increase the performance remarkable. But please keep in mind to return from the message callback function
* as fast as possible to not delay subsequent read/write access operations.
*
* By using the eCAL::CPayloadWriter API a full zero copy implementation is possible by providing separate methods
* for the initialization and the modification of the memory file content (see CPayloadWriter documentation).
*
*
* --------------------------------------------------------------------------------------------------------------
* Subscriber receive acknowledgement with timeout (SHM::Configuration::acknowledge_timeout_ms)
* --------------------------------------------------------------------------------------------------------------
*
* Most applications perform very well with the default behavior. If subscribers are too slow
* to process incoming messages then the overall software architecture needs to be checked, software components
* need to be optimized or parallelized.
*
* There may still be cases where it could make sense to synchronize the transfer of the payload from a publisher
* to a subscriber by using an additional handshake event. This event is signaled by a subscriber back to the
* sending publisher to confirm the complete payload transmission and the processed subscriber callback.
*
* The publisher will wait up to the specified timeout for the acknowledge signals of all connected subscribers
* before sending new content. Finally that means the publishers CPublisher::Send API function call is now blocked
* and will not return until all subscriber have read and processed their content or the timeout has been reached.
*
*
* --------------------------------------------------------------------------------------------------------------
* Number of handled memory files (SHM::Configuration::memfile_buffer_count)
* --------------------------------------------------------------------------------------------------------------
*
* By default, each publisher creates one memory file to distribute its payload to the subscribers. Since eCAL does not
* currently support a rw lock synchronisation mechanism for interprocess communication, reading subscribers are blocking
* the memory file and thus are preventing a publisher from writing the next payload into the file.
*
* This blocking behavior can be mitigated by using multiple memory files per publisher/subscriber connection. These memory
* files are then written in a kind of ring buffer.
*
* The disadvantage of this setting (memfile_buffer_count > 1) is the higher consumption of resources (memory files, events..)
*
**/

#pragma once

#include <ecal/ecal_tlayer.h>

#include <cstddef>

namespace eCAL
{
namespace Publisher
{
namespace SHM
{
struct ECAL_API Configuration
{
bool enable = false; //!< enable layer
bool zero_copy_mode = false; //!< enable zero copy shared memory transport mode
int acknowledge_timeout_ms = 0; /*!< force connected subscribers to send acknowledge event after processing the message
the publisher send call is blocked on this event with this timeout (0 == no handshake) */
size_t memfile_min_size_bytes = 4096; //!< default memory file size for new publisher
size_t memfile_reserve_percent = 50; //!< dynamic file size reserve before recreating memory file if topic size changes
size_t memfile_buffer_count = 1; //!< maximum number of used buffers (needs to be greater than 1, default = 1)
};
}

namespace UDP
{
struct ECAL_API Configuration
{
bool enable = false; //!< enable layer
bool loopback = false; //!< enable to receive udp messages on the same local machine
int sndbuf_size_bytes = (5*1024*1024); //!< udp send buffer size in bytes (default 5MB)
};
}

namespace TCP
{
struct ECAL_API Configuration
{
bool enable = false; //!< enable layer
};
}

struct ECAL_API Configuration
{
Configuration();

SHM::Configuration shm;
UDP::Configuration udp;
TCP::Configuration tcp;

bool share_topic_type = true; //!< share topic type via registration
bool share_topic_description = true; //!< share topic description via registration
};
}
}
5 changes: 2 additions & 3 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ namespace eCAL

protected:
// class members
std::shared_ptr<CDataReader> m_datareader;
bool m_created;
bool m_initialized;
std::shared_ptr<CDataReader> m_datareader;
bool m_created;
};
}
1 change: 1 addition & 0 deletions ecal/core/include/ecal/ecal_tlayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace eCAL
smode_auto
};

// TODO: Do weed need this ?
/**
* @brief eCAL transport layer state struct.
**/
Expand Down
Loading

0 comments on commit f3312fa

Please sign in to comment.