Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed May 16, 2024
1 parent f7fa4aa commit 7b2155d
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 146 deletions.
158 changes: 68 additions & 90 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ namespace eCAL
// register
Register(false);

// enable transport layers
EnableUdpLayer(config_.udp.enable);
EnableShmLayer(config_.shm.enable);
EnableTcpLayer(config_.tcp.enable);
// start udp, shm, tcp layer
StartTransportLayer();
}

CDataWriter::~CDataWriter()
Expand All @@ -123,20 +121,8 @@ namespace eCAL
Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Stop");
#endif

// destroy udp multicast writer
#if ECAL_CORE_TRANSPORT_UDP
m_writer_udp.reset();
#endif

// destroy memory file writer
#if ECAL_CORE_TRANSPORT_SHM
m_writer_shm.reset();
#endif

// destroy tcp writer
#if ECAL_CORE_TRANSPORT_TCP
m_writer_tcp.reset();
#endif
// stop udp, shm, tcp layer
StopTransportLayer();

// clear subscriber maps
{
Expand Down Expand Up @@ -259,10 +245,10 @@ namespace eCAL

// can we do a zero copy write ?
const bool allow_zero_copy =
m_config.shm.zero_copy_mode // zero copy mode activated by user
&& m_config.shm.enable // shm layer active
&& !m_config.udp.enable // udp layer inactive
&& !m_config.tcp.enable; // tcp layer inactive
m_config.shm.zero_copy_mode // zero copy mode activated by user
&& m_writer_shm // shm layer active
&& !m_writer_udp // udp layer inactive
&& !m_writer_tcp; // tcp layer inactive

// create a payload copy for all layer
if (!allow_zero_copy)
Expand All @@ -281,7 +267,7 @@ namespace eCAL
// SHM
////////////////////////////////////////////////////////////////////////////
#if ECAL_CORE_TRANSPORT_SHM
if (m_writer_shm && m_config.shm.enable)
if (m_writer_shm)
{
#ifndef NDEBUG
// log it
Expand Down Expand Up @@ -346,7 +332,7 @@ namespace eCAL
// UDP (MC)
////////////////////////////////////////////////////////////////////////////
#if ECAL_CORE_TRANSPORT_UDP
if (m_writer_udp && m_config.udp.enable)
if (m_writer_udp)
{
#ifndef NDEBUG
// log it
Expand Down Expand Up @@ -397,7 +383,7 @@ namespace eCAL
// TCP
////////////////////////////////////////////////////////////////////////////
#if ECAL_CORE_TRANSPORT_TCP
if (m_writer_tcp && m_config.tcp.enable)
if (m_writer_tcp)
{
#ifndef NDEBUG
// log it
Expand Down Expand Up @@ -736,79 +722,88 @@ namespace eCAL
}
}

void CDataWriter::EnableUdpLayer(bool state_)
void CDataWriter::StartTransportLayer()
{
#if ECAL_CORE_TRANSPORT_UDP
m_config.udp.enable = state_;
if (!m_created) return;

// log state
LogLayerState(state_, "CDataWriter::ActivateUdpLayer::UDP_SENDMODE");

if (state_)
if (m_config.udp.enable)
{
m_writer_udp = std::make_unique<CDataWriterUdpMC>(m_host_name, m_topic_name, m_topic_id, m_config.udp);
#ifndef NDEBUG
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::UDP_WRITER");
ActivateUdpLayer();
}
#endif
#if ECAL_CORE_TRANSPORT_SHM
if (m_config.shm.enable)
{
ActivateShmLayer();
}
else
#endif
#if ECAL_CORE_TRANSPORT_TCP
if (m_config.tcp.enable)
{
m_writer_udp.reset();
ActivateTcpLayer();
}
#else // ECAL_CORE_TRANSPORT_UDP
(void)state_;
#endif // ECAL_CORE_TRANSPORT_UDP
#endif
}

void CDataWriter::EnableShmLayer(bool state_)
void CDataWriter::StopTransportLayer()
{
// destroy udp writer
#if ECAL_CORE_TRANSPORT_UDP
m_writer_udp.reset();
#endif

// destroy shm writer
#if ECAL_CORE_TRANSPORT_SHM
m_config.shm.enable = state_;
if (!m_created) return;
m_writer_shm.reset();
#endif

// destroy tcp writer
#if ECAL_CORE_TRANSPORT_TCP
m_writer_tcp.reset();
#endif
}

void CDataWriter::ActivateUdpLayer()
{
#if ECAL_CORE_TRANSPORT_UDP
// log state
LogLayerState(state_, "CDataWriter::ActivateShmLayer::SHM_SENDMODE");
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::ACTIVATED");

// create writer
m_writer_udp = std::make_unique<CDataWriterUdpMC>(m_host_name, m_topic_name, m_topic_id, m_config.udp);

if (state_)
{
m_writer_shm = std::make_unique<CDataWriterSHM>(m_host_name, m_topic_name, m_topic_id, m_config.shm);
m_writer_shm->SetBufferCount(m_config.shm.memfile_buffer_count);
#ifndef NDEBUG
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::SHM_WRITER");
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::WRITER_CREATED");
#endif
#endif // ECAL_CORE_TRANSPORT_UDP
}

void CDataWriter::ActivateShmLayer()
{
#if ECAL_CORE_TRANSPORT_SHM
// log state
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::ACTIVATED");

// create writer
m_writer_shm = std::make_unique<CDataWriterSHM>(m_host_name, m_topic_name, m_topic_id, m_config.shm);

#ifndef NDEBUG
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::WRITER_CREATED");
#endif
}
else
{
m_writer_shm.reset();
}
#else // ECAL_CORE_TRANSPORT_SHM
(void)state_;
#endif // ECAL_CORE_TRANSPORT_SHM
}

void CDataWriter::EnableTcpLayer(bool state_)
void CDataWriter::ActivateTcpLayer()
{
#if ECAL_CORE_TRANSPORT_TCP
m_config.tcp.enable = state_;
if (!m_created) return;

// log state
LogLayerState(state_, "CDataWriter::ActivateTcpLayer::TCP_SENDMODE");
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::ACTIVATED");

// create writer
m_writer_tcp = std::make_unique<CDataWriterTCP>(m_host_name, m_topic_name, m_topic_id, m_config.tcp);

if (state_)
{
m_writer_tcp = std::make_unique<CDataWriterTCP>(m_host_name, m_topic_name, m_topic_id, m_config.tcp);
#ifndef NDEBUG
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::TCP_WRITER - SUCCESS");
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::WRITER_CREATED");
#endif
}
else
{
m_writer_tcp.reset();
}
#else // ECAL_CORE_TRANSPORT_TCP
(void)state_;
#endif // ECAL_CORE_TRANSPORT_TCP
}

Expand Down Expand Up @@ -847,23 +842,6 @@ namespace eCAL
return is_internal_only;
}

void CDataWriter::LogLayerState(bool state_, const std::string& base_msg_)
{
#ifndef NDEBUG
if (state_)
{
Logging::Log(log_level_debug4, m_topic_name + "::" + base_msg_ + "::ON");
}
else
{
Logging::Log(log_level_debug4, m_topic_name + "::" + base_msg_ + "::OFF");
}
#else
(void)state_;
(void)base_msg_;
#endif
}

int32_t CDataWriter::GetFrequency()
{
const auto frequency_time = std::chrono::steady_clock::now();
Expand Down
10 changes: 6 additions & 4 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ namespace eCAL
void Connect(const std::string& tid_, const SDataTypeInformation& tinfo_);
void Disconnect();

void EnableUdpLayer(bool state_);
void EnableShmLayer(bool state_);
void EnableTcpLayer(bool state_);
void StartTransportLayer();
void StopTransportLayer();

void ActivateUdpLayer();
void ActivateShmLayer();
void ActivateTcpLayer();

size_t PrepareWrite(long long id_, size_t len_);
bool IsInternalSubscribedOnly();
void LogLayerState(bool state_, const std::string& base_msg_);

int32_t GetFrequency();

Expand Down
100 changes: 50 additions & 50 deletions ecal/core/src/readwrite/shm/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,56 +56,6 @@ namespace eCAL

return info_;
}

bool CDataWriterSHM::SetBufferCount(size_t buffer_count_)
{
// no need to adapt anything
if (m_memory_file_vec.size() == buffer_count_) return true;

// buffer count zero not allowed
if (buffer_count_ < 1)
{
Logging::Log(log_level_error, m_topic_name + "::CDataWriterSHM::SetBufferCount minimal number of memory files is 1 !");
return false;
}

// prepare memfile attributes
SSyncMemoryFileAttr memory_file_attr = {};
memory_file_attr.min_size = m_config.memfile_min_size_bytes;
memory_file_attr.reserve = m_config.memfile_reserve_percent;
memory_file_attr.timeout_open_ms = PUB_MEMFILE_OPEN_TO;
memory_file_attr.timeout_ack_ms = m_config.acknowledge_timeout_ms;

// retrieve the memory file size of existing files
size_t memory_file_size(0);
if (!m_memory_file_vec.empty())
{
memory_file_size = m_memory_file_vec[0]->GetSize();
}
else
{
memory_file_size = memory_file_attr.min_size;
}

// create memory file vector
m_memory_file_vec.clear();
while (m_memory_file_vec.size() < buffer_count_)
{
auto sync_memfile = std::make_shared<CSyncMemoryFile>(m_memfile_base_name, memory_file_size, memory_file_attr);
if (sync_memfile->IsCreated())
{
m_memory_file_vec.push_back(sync_memfile);
}
else
{
m_memory_file_vec.clear();
Logging::Log(log_level_error, "CDataWriterSHM::SetBufferCount - FAILED");
return false;
}
}

return true;
}

bool CDataWriterSHM::PrepareWrite(const SWriterAttr& attr_)
{
Expand Down Expand Up @@ -158,4 +108,54 @@ namespace eCAL
}
return connection_par;
}

bool CDataWriterSHM::SetBufferCount(size_t buffer_count_)
{
// no need to adapt anything
if (m_memory_file_vec.size() == buffer_count_) return true;

// buffer count zero not allowed
if (buffer_count_ < 1)
{
Logging::Log(log_level_error, m_topic_name + "::CDataWriterSHM::SetBufferCount minimal number of memory files is 1 !");
return false;
}

// prepare memfile attributes
SSyncMemoryFileAttr memory_file_attr = {};
memory_file_attr.min_size = m_config.memfile_min_size_bytes;
memory_file_attr.reserve = m_config.memfile_reserve_percent;
memory_file_attr.timeout_open_ms = PUB_MEMFILE_OPEN_TO;
memory_file_attr.timeout_ack_ms = m_config.acknowledge_timeout_ms;

// retrieve the memory file size of existing files
size_t memory_file_size(0);
if (!m_memory_file_vec.empty())
{
memory_file_size = m_memory_file_vec[0]->GetSize();
}
else
{
memory_file_size = memory_file_attr.min_size;
}

// create memory file vector
m_memory_file_vec.clear();
while (m_memory_file_vec.size() < buffer_count_)
{
auto sync_memfile = std::make_shared<CSyncMemoryFile>(m_memfile_base_name, memory_file_size, memory_file_attr);
if (sync_memfile->IsCreated())
{
m_memory_file_vec.push_back(sync_memfile);
}
else
{
m_memory_file_vec.clear();
Logging::Log(log_level_error, "CDataWriterSHM::SetBufferCount - FAILED");
return false;
}
}

return true;
}
}
4 changes: 2 additions & 2 deletions ecal/core/src/readwrite/shm/ecal_writer_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ namespace eCAL

SWriterInfo GetInfo() override;

bool SetBufferCount(size_t buffer_count_);

bool PrepareWrite(const SWriterAttr& attr_) override;

bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override;
Expand All @@ -53,6 +51,8 @@ namespace eCAL
Registration::ConnectionPar GetConnectionParameter() override;

protected:
bool SetBufferCount(size_t buffer_count_);

Publisher::SHM::Configuration m_config;

size_t m_write_idx = 0;
Expand Down

0 comments on commit 7b2155d

Please sign in to comment.