Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] publisher config API #1561

Merged
merged 27 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
22d15d1
new publisher config API first draft
rex-schilasky Apr 23, 2024
1fc0e3d
Merge branch 'master' into feature/parameter-api-publisher
rex-schilasky Apr 24, 2024
4462bd6
some publisher tests reactivated (layer depending or shm layer parame…
rex-schilasky Apr 25, 2024
82c4269
person snd udp sample added
rex-schilasky Apr 26, 2024
d3a8b46
Merge branch 'master' into feature/parameter-api-publisher
rex-schilasky Apr 26, 2024
f3a58f3
person_snd_udp sample fixed
rex-schilasky Apr 26, 2024
0ee1477
buffer count needs to be set after creation of shm writer
rex-schilasky Apr 26, 2024
be6cf11
CPublisher::Config used in all writers
rex-schilasky Apr 29, 2024
5d14fc7
clang-tidy
rex-schilasky Apr 29, 2024
b57db88
pubsub tests reclustered (shm, udp, tcp)
rex-schilasky Apr 30, 2024
991179d
more pubsub tests sorted out for shm only
rex-schilasky Apr 30, 2024
010bbdf
publisher config separated
rex-schilasky May 2, 2024
6818c2c
clang-tidy + PubConfig fixed
rex-schilasky May 2, 2024
eb93882
PubConfig -> Publisher::Configuration
rex-schilasky May 6, 2024
0b4f932
improved documentation
rex-schilasky May 6, 2024
37b6fdc
internal reader/writer map handling redesigned (one common map instea…
rex-schilasky May 8, 2024
0f79cd1
renamings, layer info added to internal pub/sub maps
rex-schilasky May 8, 2024
c2970dd
atomic Initialization
rex-schilasky May 8, 2024
1190753
Stop function introduced (former Destroy) to shutdown (publisher, sub…
rex-schilasky May 10, 2024
18d98be
core_cpp_pubsub, TimeoutAcknowledgment timing changed (failed on GH a…
rex-schilasky May 11, 2024
7804f09
changed naming activate to enable for shm, udp, tcp layer configuration
rex-schilasky May 13, 2024
892c73f
clang-tidy
rex-schilasky May 13, 2024
77eb296
memfile buffer number removed from writer attributes (can not be chan…
rex-schilasky May 14, 2024
d78c4d3
single component finalizing removed
rex-schilasky May 14, 2024
d0d034e
eCAL::Finalize() fixed (no more parameter)
rex-schilasky May 15, 2024
90dd2da
cleanups
rex-schilasky May 16, 2024
b08bec3
fixed zero copy mode on/off logic
rex-schilasky May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ endif()
if(ECAL_CORE_PUBLISHER)
set(ecal_pub_src
src/pubsub/ecal_publisher.cpp
src/pubsub/ecal_publisher_config.cpp
src/pubsub/ecal_pubgate.cpp
src/pubsub/ecal_pubgate.h
)
Expand Down Expand Up @@ -222,8 +223,8 @@ if(ECAL_CORE_PUBLISHER)
)
if(ECAL_CORE_TRANSPORT_UDP)
list(APPEND ecal_writer_src
src/readwrite/udp/ecal_writer_udp_mc.cpp
src/readwrite/udp/ecal_writer_udp_mc.h
src/readwrite/udp/ecal_writer_udp.cpp
src/readwrite/udp/ecal_writer_udp.h
)
endif()
if(ECAL_CORE_TRANSPORT_TCP)
Expand All @@ -249,8 +250,8 @@ if(ECAL_CORE_SUBSCRIBER)
)
if(ECAL_CORE_TRANSPORT_UDP)
list(APPEND ecal_reader_src
src/readwrite/udp/ecal_reader_udp_mc.cpp
src/readwrite/udp/ecal_reader_udp_mc.h
src/readwrite/udp/ecal_reader_udp.cpp
src/readwrite/udp/ecal_reader_udp.h
)
endif()
if(ECAL_CORE_TRANSPORT_TCP)
Expand Down Expand Up @@ -463,6 +464,7 @@ set(ecal_header_cmn
include/ecal/ecal_process.h
include/ecal/ecal_process_severity.h
include/ecal/ecal_publisher.h
include/ecal/ecal_publisher_config.h
include/ecal/ecal_server.h
include/ecal/ecal_service_info.h
include/ecal/ecal_subscriber.h
Expand Down
20 changes: 0 additions & 20 deletions ecal/core/include/ecal/cimpl/ecal_publisher_cimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,6 @@ extern "C"
**/
ECALC_API int eCAL_Pub_ClearAttribute(ECAL_HANDLE handle_, const char* attr_name_, int attr_name_len_);

/**
* @brief Share topic type.
*
* @param handle_ Publisher handle.
* @param state_ Set type share mode (none zero == share type).
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Pub_ShareType(ECAL_HANDLE handle_, int state_);

/**
* @brief Share topic description.
*
* @param handle_ Publisher handle.
* @param state_ Set description share mode (none zero == share description).
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Pub_ShareDescription(ECAL_HANDLE handle_, int state_);

/**
* @brief Set the specific topic id.
*
Expand Down
41 changes: 13 additions & 28 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <ecal/ecal_deprecate.h>
#include <ecal/ecal_os.h>
#include <ecal/ecal_payload_writer.h>
#include <ecal/ecal_publisher_config.h>
#include <ecal/ecal_types.h>

#include <chrono>
Expand Down Expand Up @@ -69,7 +70,6 @@ namespace eCAL
class CPublisher
{
public:

ECAL_API static constexpr long long DEFAULT_TIME_ARGUMENT = -1; /*!< Use DEFAULT_TIME_ARGUMENT in the `Send()` function to let eCAL determine the send timestamp */

/**
Expand All @@ -80,17 +80,19 @@ namespace eCAL
/**
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param topic_info_ Topic information (encoding, type, descriptor)
* @param topic_name_ Unique topic name.
* @param data_type_info_ Topic data type information (encoding, type, descriptor).
* @param config_ Optional configuration parameters.
**/
ECAL_API CPublisher(const std::string& topic_name_, const SDataTypeInformation& topic_info_);
ECAL_API CPublisher(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = {});

/**
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param config_ Optional configuration parameters.
**/
ECAL_API explicit CPublisher(const std::string& topic_name_);
ECAL_API explicit CPublisher(const std::string& topic_name_, const Publisher::Configuration& config_ = {});

/**
* @brief Destructor.
Expand Down Expand Up @@ -120,12 +122,13 @@ namespace eCAL
/**
* @brief Creates this object.
*
* @param topic_name_ Unique topic name.
* @param topic_info_ Topic information (encoding, type, descriptor)
* @param topic_name_ Unique topic name.
* @param data_type_info_ Topic data type information (encoding, type, descriptor).
* @param config_ Optional configuration parameters.
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool Create(const std::string& topic_name_, const SDataTypeInformation& topic_info_);
ECAL_API bool Create(const std::string& topic_name_, const SDataTypeInformation& data_type_info_, const Publisher::Configuration& config_ = {});

/**
* @brief Creates this object.
Expand All @@ -146,11 +149,11 @@ namespace eCAL
/**
* @brief Setup topic information.
*
* @param topic_info_ Topic information attributes.
* @param data_type_info_ Topic data type information attributes.
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool SetDataTypeInformation(const SDataTypeInformation& topic_info_);
ECAL_API bool SetDataTypeInformation(const SDataTypeInformation& data_type_info_);

/**
* @brief Sets publisher attribute.
Expand All @@ -172,24 +175,6 @@ namespace eCAL
**/
ECAL_API bool ClearAttribute(const std::string& attr_name_);

/**
* @brief Share topic type.
*
* @param state_ Set type share mode (true == share type).
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool ShareType(bool state_ = true);

/**
* @brief Share topic description.
*
* @param state_ Set description share mode (true == share description).
*
* @return True if it succeeds, false if it fails.
**/
ECAL_API bool ShareDescription(bool state_ = true);

/**
* @brief Set the specific topic id.
*
Expand Down
142 changes: 142 additions & 0 deletions ecal/core/include/ecal/ecal_publisher_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ========================= eCAL LICENSE =================================
*/

/**
* @file ecal_publisher_config.h
* @brief eCAL publisher configuration
*
* This publisher configuration struct can be used to define the behavior of an eCAL publisher. Additional information on
* selected configuration parameters:
*
* --------------------------------------------------------------------------------------------------------------
* Zero copy shared memory transport mode (SHM::Configuration::zero_copy_mode)
* --------------------------------------------------------------------------------------------------------------
*
* By default, the built-in shared memory layer is configured to make two memory copies
* one on the publisher and one on the subscriber side.
*
* The intention of this implementation is to free the file as fast as possible after writing and reading
* its content to allow other processes to access the content with minimal latency. The publisher and subscribers
* are fully decoupled and can access their internal memory copy independently.
*
* If the zero copy mode is switched on no memory will be copied at all using the low level binary publish / subscribe API.
* On publisher side the memory copy is exectuted into the opened memory file. On the subscriber side the user message
* callback is called right after opening the memory file. A direct pointer to the memory payload is forwarded
* and can be processed with no latency. The memory file will be closed after the user callback function
* returned.
*
* The advantage of this configuration is a much higher performance for large payloads (> 1024 kB).
* The disadvantage of this configuration is that in the time when the callback is executed the memory file
* is blocked for other subscribers and for writing publishers too. Maybe this can be eliminated
* by a better memory file read/write access implementation (lock free read) in future releases.
*
* Today, for specific scenarios (1:1 pub/sub connections with large payloads for example) this feature
* can increase the performance remarkable. But please keep in mind to return from the message callback function
* as fast as possible to not delay subsequent read/write access operations.
*
* By using the eCAL::CPayloadWriter API a full zero copy implementation is possible by providing separate methods
* for the initialization and the modification of the memory file content (see CPayloadWriter documentation).
*
*
* --------------------------------------------------------------------------------------------------------------
* Subscriber receive acknowledgement with timeout (SHM::Configuration::acknowledge_timeout_ms)
* --------------------------------------------------------------------------------------------------------------
*
* Most applications perform very well with the default behavior. If subscribers are too slow
* to process incoming messages then the overall software architecture needs to be checked, software components
* need to be optimized or parallelized.
*
* There may still be cases where it could make sense to synchronize the transfer of the payload from a publisher
* to a subscriber by using an additional handshake event. This event is signaled by a subscriber back to the
* sending publisher to confirm the complete payload transmission and the processed subscriber callback.
*
* The publisher will wait up to the specified timeout for the acknowledge signals of all connected subscribers
* before sending new content. Finally that means the publishers CPublisher::Send API function call is now blocked
* and will not return until all subscriber have read and processed their content or the timeout has been reached.
*
*
* --------------------------------------------------------------------------------------------------------------
* Number of handled memory files (SHM::Configuration::memfile_buffer_count)
* --------------------------------------------------------------------------------------------------------------
*
* By default, each publisher creates one memory file to distribute its payload to the subscribers. Since eCAL does not
* currently support a rw lock synchronisation mechanism for interprocess communication, reading subscribers are blocking
* the memory file and thus are preventing a publisher from writing the next payload into the file.
*
* This blocking behavior can be mitigated by using multiple memory files per publisher/subscriber connection. These memory
* files are then written in a kind of ring buffer.
*
* The disadvantage of this setting (memfile_buffer_count > 1) is the higher consumption of resources (memory files, events..)
*
**/

#pragma once

#include <ecal/ecal_tlayer.h>

#include <cstddef>

namespace eCAL
{
namespace Publisher
{
namespace SHM
{
struct ECAL_API Configuration
{
bool activate = false; //!< activate layer
bool zero_copy_mode = false; //!< enable zero copy shared memory transport mode
int acknowledge_timeout_ms = 0; /*!< force connected subscribers to send acknowledge event after processing the message
the publisher send call is blocked on this event with this timeout (0 == no handshake) */
size_t memfile_min_size_bytes = 4096; //!< default memory file size for new publisher
size_t memfile_reserve_percent = 50; //!< dynamic file size reserve before recreating memory file if topic size changes
size_t memfile_buffer_count = 1; //!< maximum number of used buffers (needs to be greater than 1, default = 1)
};
}

namespace UDP
{
struct ECAL_API Configuration
{
bool activate = false; //!< activate layer
int sndbuf_size_bytes = (5*1024*1024); //!< udp send buffer size in bytes (default 5MB)
};
}

namespace TCP
{
struct ECAL_API Configuration
{
bool activate = false; //!< activate layer
};
}

struct ECAL_API Configuration
{
Configuration();

SHM::Configuration shm;
UDP::Configuration udp;
TCP::Configuration tcp;

bool share_topic_type = true; //!< share topic type via registration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should discuss if we still support this for ecal 6. Also do we need to distinguish between type and descriptor? maybe only allow to switch of descriptor? (but not in this PR)

bool share_topic_description = true; //!< share topic description via registration
};
}
}
1 change: 1 addition & 0 deletions ecal/core/include/ecal/ecal_tlayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace eCAL
smode_auto
};

// TODO: Do weed need this ?
/**
* @brief eCAL transport layer state struct.
**/
Expand Down
20 changes: 11 additions & 9 deletions ecal/core/include/ecal/msg/capnproto/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ namespace eCAL
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param config_ Optional configuration parameters.
**/
CPublisher(const std::string& topic_name_)
: eCAL::CPublisher(topic_name_, GetDataTypeInformation())
CPublisher(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = {})
rex-schilasky marked this conversation as resolved.
Show resolved Hide resolved
: eCAL::CPublisher(topic_name_, GetDataTypeInformation(), config_)
, builder(std::make_unique<capnp::MallocMessageBuilder>())
, root_builder(builder->initRoot<message_type>())
{
Expand Down Expand Up @@ -145,12 +146,13 @@ namespace eCAL
* @brief Creates this object.
*
* @param topic_name_ Unique topic name.
* @param config_ Optional configuration parameters.
*
* @return True if it succeeds, false if it fails.
**/
bool Create(const std::string& topic_name_)
bool Create(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = {})
{
return(eCAL::CPublisher::Create(topic_name_, GetDataTypeInformation()));
return(eCAL::CPublisher::Create(topic_name_, GetDataTypeInformation(), config_));
}

typename message_type::Builder GetBuilder()
Expand All @@ -172,11 +174,11 @@ namespace eCAL
**/
SDataTypeInformation GetDataTypeInformation() const
{
SDataTypeInformation topic_info;
topic_info.encoding = eCAL::capnproto::EncodingAsString();
topic_info.name = eCAL::capnproto::TypeAsString<message_type>();
topic_info.descriptor = eCAL::capnproto::SchemaAsString<message_type>();
return topic_info;
SDataTypeInformation data_type_info;
data_type_info.encoding = eCAL::capnproto::EncodingAsString();
data_type_info.name = eCAL::capnproto::TypeAsString<message_type>();
data_type_info.descriptor = eCAL::capnproto::SchemaAsString<message_type>();
return data_type_info;
}

std::unique_ptr<capnp::MallocMessageBuilder> builder;
Expand Down
14 changes: 8 additions & 6 deletions ecal/core/include/ecal/msg/flatbuffers/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ namespace eCAL
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
* @param config_ Optional configuration parameters.
**/
CPublisher(const std::string& topic_name_) : CMsgPublisher<T>(topic_name_, GetDataTypeInformation())
CPublisher(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = {}) : CMsgPublisher<T>(topic_name_, GetDataTypeInformation(), config_)
{
}

Expand Down Expand Up @@ -80,12 +81,13 @@ namespace eCAL
* @brief Creates this object.
*
* @param topic_name_ Unique topic name.
* @param config_ Optional configuration parameters.
*
* @return True if it succeeds, false if it fails.
**/
bool Create(const std::string& topic_name_)
bool Create(const std::string& topic_name_, const eCAL::Publisher::Configuration& config_ = {})
{
return(CMsgPublisher<T>::Create(topic_name_, GetDataTypeInformation()));
return(CMsgPublisher<T>::Create(topic_name_, GetDataTypeInformation(), config_));
}

private:
Expand All @@ -96,10 +98,10 @@ namespace eCAL
**/
SDataTypeInformation GetDataTypeInformation() const override
{
SDataTypeInformation topic_info;
topic_info.encoding = "flatb";
SDataTypeInformation data_type_info;
data_type_info.encoding = "flatb";
// empty type, empty descriptor
return topic_info;
return data_type_info;
}

/**
Expand Down
Loading
Loading