Skip to content

Commit

Permalink
remove local connection logic removed from ecal_writer.cpp (these eve…
Browse files Browse the repository at this point in the history
…nts are closed on CSyncMemFile Destruction/Disconnect)

call AddLocConnection/AddExtConnection RemLocConnection/RemExtConnection for all layers (including tcp) even function are unimplemented
changed signature for all AddXXXConnection/RemXXXConnection functions to provide unambiguous information (host, process, topic)
  • Loading branch information
rex-schilasky committed Oct 16, 2023
1 parent 5ca0c8a commit 01f0f99
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 47 deletions.
29 changes: 13 additions & 16 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,9 @@ namespace eCAL
m_loc_subscribed = true;

// add a new local subscription
m_writer.udp_mc.AddLocConnection (local_info_.process_id, reader_par_);
m_writer.shm.AddLocConnection (local_info_.process_id, reader_par_);
m_writer.udp_mc.AddLocConnection (local_info_.process_id, local_info_.topic_id, reader_par_);
m_writer.shm.AddLocConnection (local_info_.process_id, local_info_.topic_id, reader_par_);
m_writer.tcp.AddLocConnection(local_info_.process_id, local_info_.topic_id, reader_par_);

#ifndef NDEBUG
// log it
Expand All @@ -686,8 +687,9 @@ namespace eCAL
}

// remove a local subscription
m_writer.udp_mc.RemLocConnection (local_info_.process_id);
m_writer.shm.RemLocConnection (local_info_.process_id);
m_writer.udp_mc.RemLocConnection (local_info_.process_id, local_info_.topic_id);
m_writer.shm.RemLocConnection (local_info_.process_id, local_info_.topic_id);
m_writer.tcp.RemLocConnection (local_info_.process_id, local_info_.topic_id);

#ifndef NDEBUG
// log it
Expand All @@ -708,8 +710,9 @@ namespace eCAL
m_ext_subscribed = true;

// add a new external subscription
m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);
m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, reader_par_);
m_writer.udp_mc.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_);
m_writer.shm.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_);
m_writer.tcp.AddExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id, reader_par_);

#ifndef NDEBUG
// log it
Expand All @@ -726,8 +729,9 @@ namespace eCAL
}

// remove external subscription
m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id);
m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id);
m_writer.udp_mc.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id);
m_writer.shm.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id);
m_writer.tcp.RemExtConnection (external_info_.host_name, external_info_.process_id, external_info_.topic_id);
}

void CDataWriter::RefreshRegistration()
Expand Down Expand Up @@ -764,22 +768,15 @@ namespace eCAL
Register(false);

// check connection timeouts
// Todo: Why are only Local connections removed, not external connections?
const std::shared_ptr<std::list<SLocalSubscriptionInfo>> loc_timeouts = std::make_shared<std::list<SLocalSubscriptionInfo>>();
{
const std::lock_guard<std::mutex> lock(m_sub_map_sync);
m_loc_sub_map.remove_deprecated(loc_timeouts.get());
m_loc_sub_map.remove_deprecated();
m_ext_sub_map.remove_deprecated();

m_loc_subscribed = !m_loc_sub_map.empty();
m_ext_subscribed = !m_ext_sub_map.empty();
}

for(const auto& loc_sub : *loc_timeouts)
{
m_writer.shm.RemLocConnection(loc_sub.process_id);
}

if (!m_loc_subscribed && !m_ext_subscribed)
{
Disconnect();
Expand Down
8 changes: 4 additions & 4 deletions ecal/core/src/readwrite/ecal_writer_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ namespace eCAL
virtual bool SetQOS(const QOS::SWriterQOS& qos_) { m_qos = qos_; return true; };
QOS::SWriterQOS GetQOS() { return(m_qos); };

virtual bool AddLocConnection(const std::string& /*process_id_*/, const std::string& /*conn_par_*/) { return false; };
virtual bool RemLocConnection(const std::string& /*process_id_*/) { return false; };
virtual bool AddLocConnection(const std::string& /*process_id_*/, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) { return false; };
virtual bool RemLocConnection(const std::string& /*process_id_*/, const std::string& /*topic_id_*/) { return false; };

virtual bool AddExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*conn_par_*/) { return false; };
virtual bool RemExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/) { return false; };
virtual bool AddExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/) { return false; };
virtual bool RemExtConnection(const std::string& /*host_name_*/, const std::string& /*process_id_*/, const std::string& /*topic_id_*/) { return false; };

virtual std::string GetConnectionParameter() { return ""; };

Expand Down
24 changes: 1 addition & 23 deletions ecal/core/src/readwrite/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ namespace eCAL
return sent;
}

bool CDataWriterSHM::AddLocConnection(const std::string& process_id_, const std::string& /*conn_par_*/)
bool CDataWriterSHM::AddLocConnection(const std::string& process_id_, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/)
{
if (!m_created) return false;
bool ret_state(true);
Expand All @@ -190,28 +190,6 @@ namespace eCAL
return ret_state;
}

bool CDataWriterSHM::RemLocConnection(const std::string& process_id_)
{
if (!m_created) return false;
bool ret_state(true);

for (auto& memory_file : m_memory_file_vec)
{
// This is not working correctly under POSIX for memory files that are read and written within the same process.
//
// The functions 'CSyncMemoryFile::Disconnect' and 'CDataWriterSHM::RemLocConnection' are now called
// by the new Subscriber Unregistration event logic and were never called in any previous eCAL version.
//
// TODO: Fix this in 'CSyncMemoryFile::Disconnect' to handle event resources properly.
if (std::to_string(eCAL::Process::GetProcessID()) != process_id_)
{
ret_state &= memory_file->Disconnect(process_id_);
}
}

return ret_state;
}

std::string CDataWriterSHM::GetConnectionParameter()
{
// starting from eCAL version > 5.8.13/5.9.0 the ConnectionParameter is defined as google protobuf
Expand Down
3 changes: 1 addition & 2 deletions ecal/core/src/readwrite/ecal_writer_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ namespace eCAL

bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override;

bool AddLocConnection(const std::string& process_id_, const std::string& conn_par_) override;
bool RemLocConnection(const std::string& process_id_) override;
bool AddLocConnection(const std::string& process_id_, const std::string& /*topic_id_*/, const std::string& conn_par_) override;

std::string GetConnectionParameter() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

void OnEvent(const char* topic_name_, const struct eCAL::SSubEventCallbackData* data_)
{
std::cout << "topic name : " << topic_name_ << std::endl;
std::cout << "topic name : " << topic_name_ << std::endl;
switch (data_->type)
{
case sub_event_connected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

void OnEvent(const char* topic_name_, const struct eCAL::SPubEventCallbackData* data_)
{
std::cout << "topic name : " << topic_name_ << std::endl;
std::cout << "topic name : " << topic_name_ << std::endl;
switch (data_->type)
{
case pub_event_connected:
Expand Down

0 comments on commit 01f0f99

Please sign in to comment.