Skip to content

Commit

Permalink
[core] add publisher registration event callback API (#1717)
Browse files Browse the repository at this point in the history
* add/rem publisher registration event callback API
* add/rem subscriber registration event callback API
  • Loading branch information
rex-schilasky authored Aug 29, 2024
1 parent 462fc4b commit b130791
Show file tree
Hide file tree
Showing 12 changed files with 606 additions and 99 deletions.
48 changes: 48 additions & 0 deletions ecal/core/include/ecal/ecal_registration.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <ecal/ecal_types.h>

#include <cstdint>
#include <functional>
#include <map>
#include <set>
#include <string>
Expand Down Expand Up @@ -94,6 +95,17 @@ namespace eCAL
using QualityServiceInfoMultimap = std::multimap<SServiceMethod, SQualityServiceInfo>;
using SQualityServiceInfoSet = std::set<SQualityServiceInfo>;

using CallbackToken = std::size_t;

enum class RegistrationEventType
{
new_entity, //!< Represents a new entity registration
deleted_entity //!< Represents a deletion of an entity
};

using TopicIDCallbackT = std::function<void(const STopicId&, RegistrationEventType)>;
using ServiceIDCallbackT = std::function<void(const SServiceId&, RegistrationEventType)>;

/**
* @brief Get complete snapshot of all known publisher.
*
Expand All @@ -108,6 +120,24 @@ namespace eCAL
**/
ECAL_API bool GetPublisherInfo(const STopicId& id_, SQualityTopicInfo& topic_info_);

/**
* @brief Register a callback function to be notified when a new publisher becomes available.
*
* @param callback_ The callback function to be called with the STopicId of the new publisher.
* The callback function must not be blocked for a longer period of time,
* otherwise timeout mechanisms of the eCAL registration would be triggered.
*
* @return CallbackToken Token that can be used to unregister the callback.
*/
ECAL_API CallbackToken AddPublisherEventCallback(const TopicIDCallbackT& callback_);

/**
* @brief Unregister the publisher callback using the provided token.
*
* @param token The token returned by AddPublisherCallback.
*/
ECAL_API void RemPublisherEventCallback(CallbackToken token_);

/**
* @brief Get complete snapshot of all known subscriber.
*
Expand All @@ -122,6 +152,24 @@ namespace eCAL
**/
ECAL_API bool GetSubscriberInfo(const STopicId& id_, SQualityTopicInfo& topic_info_);

/**
* @brief Register a callback function to be notified when a new subscriber becomes available.
*
* @param callback_ The callback function to be called with the STopicId of the new subscriber.
* The callback function must not be blocked for a longer period of time,
* otherwise timeout mechanisms of the eCAL registration would be triggered.
*
* @return CallbackToken Token that can be used to unregister the callback.
*/
ECAL_API CallbackToken AddSubscriberEventCallback(const TopicIDCallbackT& callback_);

/**
* @brief Unregister the subscriber callback using the provided token.
*
* @param token The token returned by AddSubscriberCallback.
*/
ECAL_API void RemSubscriberEventCallback(CallbackToken token_);

/**
* @brief Get complete snapshot of all known services.
*
Expand Down
108 changes: 94 additions & 14 deletions ecal/core/src/ecal_descgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ namespace eCAL
return GetTopic(id_, m_publisher_info_map, topic_info_);
}

Registration::CallbackToken CDescGate::AddPublisherEventCallback(const Registration::TopicIDCallbackT& callback_)
{
const std::lock_guard<std::mutex> lock(m_publisher_callback_map.mtx);

const Registration::CallbackToken new_token = CreateToken();
m_publisher_callback_map.map[new_token] = callback_;

return new_token;
}

void CDescGate::RemPublisherEventCallback(Registration::CallbackToken token_)
{
const std::lock_guard<std::mutex> lock(m_publisher_callback_map.mtx);
m_publisher_callback_map.map.erase(token_);
}

std::set<Registration::STopicId> CDescGate::GetSubscriberIDs() const
{
return GetTopicIDs(m_subscriber_info_map);
Expand All @@ -74,6 +90,22 @@ namespace eCAL
return GetTopic(id_, m_subscriber_info_map, topic_info_);
}

Registration::CallbackToken CDescGate::AddSubscriberEventCallback(const Registration::TopicIDCallbackT& callback_)
{
const std::lock_guard<std::mutex> lock(m_subscriber_callback_map.mtx);

const Registration::CallbackToken new_token = CreateToken();
m_subscriber_callback_map.map[new_token] = callback_;

return new_token;
}

void CDescGate::RemSubscriberEventCallback(Registration::CallbackToken token_)
{
const std::lock_guard<std::mutex> lock(m_subscriber_callback_map.mtx);
m_subscriber_callback_map.map.erase(token_);
}

std::set<Registration::SServiceId> CDescGate::GetServiceIDs() const
{
return GetServiceIDs(m_service_info_map);
Expand Down Expand Up @@ -126,7 +158,7 @@ namespace eCAL
std::set<Registration::SServiceId> service_id_set;

const std::lock_guard<std::mutex> lock(service_method_info_map_.mtx);
for (const auto& service_method_info_map_it : service_method_info_map_.map)
for (const auto& service_method_info_map_it : service_method_info_map_.id_map)
{
service_id_set.insert(service_method_info_map_it.first);
}
Expand All @@ -136,8 +168,8 @@ namespace eCAL
bool CDescGate::GetService(const Registration::SServiceId& id_, const SQualityServiceIdMap& service_method_info_map_, Registration::SQualityServiceInfo& service_method_info_)
{
const std::lock_guard<std::mutex> lock(service_method_info_map_.mtx);
auto iter = service_method_info_map_.map.find(id_);
if (iter == service_method_info_map_.map.end())
auto iter = service_method_info_map_.id_map.find(id_);
if (iter == service_method_info_map_.id_map.end())
{
return false;
}
Expand Down Expand Up @@ -194,16 +226,16 @@ namespace eCAL
RemServiceDescription(m_client_info_map, sample_.identifier, sample_.client.sname);
break;
case bct_reg_publisher:
ApplyTopicDescription(m_publisher_info_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, true));
ApplyTopicDescription(m_publisher_info_map, m_publisher_callback_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, true));
break;
case bct_unreg_publisher:
RemTopicDescription(m_publisher_info_map, sample_.identifier, sample_.topic.tname);
RemTopicDescription(m_publisher_info_map, m_publisher_callback_map, sample_.identifier, sample_.topic.tname);
break;
case bct_reg_subscriber:
ApplyTopicDescription(m_subscriber_info_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, false));
ApplyTopicDescription(m_subscriber_info_map, m_subscriber_callback_map, sample_.identifier, sample_.topic.tname, sample_.topic.tdatatype, GetDataTypeInfoQuality(sample_.topic.tdatatype, false));
break;
case bct_unreg_subscriber:
RemTopicDescription(m_subscriber_info_map, sample_.identifier, sample_.topic.tname);
RemTopicDescription(m_subscriber_info_map, m_subscriber_callback_map, sample_.identifier, sample_.topic.tname);
break;
default:
{
Expand All @@ -214,6 +246,7 @@ namespace eCAL
}

void CDescGate::ApplyTopicDescription(SQualityTopicIdMap& topic_info_map_,
const STopicIdCallbackMap& topic_callback_map_,
const Registration::SampleIdentifier& topic_id_,
const std::string& topic_name_,
const SDataTypeInformation& topic_info_,
Expand All @@ -225,16 +258,55 @@ namespace eCAL
topic_quality_info.info = topic_info_;
topic_quality_info.quality = topic_quality_;

const std::unique_lock<std::mutex> lock(topic_info_map_.mtx);
topic_info_map_.map[topic_info_key] = topic_quality_info;
// update topic info
bool new_topic_info(false);
{
const std::unique_lock<std::mutex> lock(topic_info_map_.mtx);
const auto iter = topic_info_map_.map.find(topic_info_key);
new_topic_info = iter == topic_info_map_.map.end();
topic_info_map_.map[topic_info_key] = topic_quality_info;
}

// notify publisher / subscriber registration callbacks about new entity
if(new_topic_info)
{
const std::unique_lock<std::mutex> lock(topic_callback_map_.mtx);
for (const auto& callback_iter : topic_callback_map_.map)
{
if (callback_iter.second)
{
callback_iter.second(topic_info_key, Registration::RegistrationEventType::new_entity);
}
}
}
}

void CDescGate::RemTopicDescription(SQualityTopicIdMap& topic_info_map_,
const STopicIdCallbackMap& topic_callback_map_,
const Registration::SampleIdentifier& topic_id_,
const std::string& topic_name_)
{
const std::unique_lock<std::mutex> lock(topic_info_map_.mtx);
topic_info_map_.map.erase(Registration::STopicId{ ConvertToEntityId(topic_id_) , topic_name_});
const auto topic_info_key = Registration::STopicId{ ConvertToEntityId(topic_id_), topic_name_ };

// delete topic info
bool deleted_topic_info(false);
{
const std::unique_lock<std::mutex> lock(topic_info_map_.mtx);
deleted_topic_info = topic_info_map_.map.erase(topic_info_key) > 0;
}

// notify publisher / subscriber registration callbacks about deleted entity
if (deleted_topic_info)
{
const std::unique_lock<std::mutex> lock(topic_callback_map_.mtx);
for (const auto& callback_iter : topic_callback_map_.map)
{
if (callback_iter.second)
{
callback_iter.second(topic_info_key, Registration::RegistrationEventType::deleted_entity);
}
}
}
}

void CDescGate::ApplyServiceDescription(SQualityServiceIdMap& service_method_info_map_,
Expand All @@ -255,7 +327,7 @@ namespace eCAL
service_quality_info.response_quality = response_type_quality_;

const std::lock_guard<std::mutex> lock(service_method_info_map_.mtx);
service_method_info_map_.map[service_method_info_key] = service_quality_info;
service_method_info_map_.id_map[service_method_info_key] = service_quality_info;
}

void CDescGate::RemServiceDescription(SQualityServiceIdMap& service_method_info_map_,
Expand All @@ -266,7 +338,7 @@ namespace eCAL

const std::lock_guard<std::mutex> lock(service_method_info_map_.mtx);

for (auto&& service_it : service_method_info_map_.map)
for (auto&& service_it : service_method_info_map_.id_map)
{
const auto service_method_info_key = service_it.first;
if ((service_method_info_key.service_name == service_name_)
Expand All @@ -278,7 +350,15 @@ namespace eCAL

for (const auto& service_method_info_key : service_method_info_keys_to_remove)
{
service_method_info_map_.map.erase(service_method_info_key);
service_method_info_map_.id_map.erase(service_method_info_key);
}
}

Registration::CallbackToken CDescGate::CreateToken()
{
// Atomically increment m_callback_token using fetch_add to ensure thread safety.
// fetch_add returns the value before increment, so we add 1 to get the new token value.
// memory_order_relaxed is used to optimize performance without additional synchronization.
return m_callback_token.fetch_add(1, std::memory_order_relaxed) + 1;
}
}
26 changes: 24 additions & 2 deletions ecal/core/src/ecal_descgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "serialization/ecal_struct_sample_registration.h"

#include <atomic>
#include <chrono>
#include <map>
#include <mutex>
Expand All @@ -49,10 +50,14 @@ namespace eCAL
// get publisher information
std::set<Registration::STopicId> GetPublisherIDs() const;
bool GetPublisherInfo(const Registration::STopicId& id_, Registration::SQualityTopicInfo& topic_info_) const;
Registration::CallbackToken AddPublisherEventCallback(const Registration::TopicIDCallbackT& callback_);
void RemPublisherEventCallback(Registration::CallbackToken token_);

// get subscriber information
std::set<Registration::STopicId> GetSubscriberIDs() const;
bool GetSubscriberInfo(const Registration::STopicId& id_, Registration::SQualityTopicInfo& topic_info_) const;
Registration::CallbackToken AddSubscriberEventCallback(const Registration::TopicIDCallbackT& callback_);
void RemSubscriberEventCallback(Registration::CallbackToken token_);

// get service information
std::set<Registration::SServiceId> GetServiceIDs() const;
Expand All @@ -71,18 +76,25 @@ namespace eCAL
CDescGate& operator=(CDescGate&&) = delete;

protected:
using QualityTopicIdMap = std::map<Registration::STopicId, Registration::SQualityTopicInfo>;
using QualityTopicIdMap = std::map<Registration::STopicId, Registration::SQualityTopicInfo>;
struct SQualityTopicIdMap
{
mutable std::mutex mtx;
QualityTopicIdMap map;
};

using TopicIdCallbackMap = std::map<Registration::CallbackToken, Registration::TopicIDCallbackT>;
struct STopicIdCallbackMap
{
mutable std::mutex mtx;
TopicIdCallbackMap map;
};

using QualityServiceIdMap = std::map<Registration::SServiceId, Registration::SQualityServiceInfo>;
struct SQualityServiceIdMap
{
mutable std::mutex mtx;
QualityServiceIdMap map;
QualityServiceIdMap id_map;
};

static std::set<Registration::STopicId> GetTopicIDs(const SQualityTopicIdMap& topic_info_map_);
Expand All @@ -92,12 +104,14 @@ namespace eCAL
static bool GetService (const Registration::SServiceId& id_, const SQualityServiceIdMap& service_method_info_map_, Registration::SQualityServiceInfo& service_method_info_);

static void ApplyTopicDescription(SQualityTopicIdMap& topic_info_map_,
const STopicIdCallbackMap& topic_callback_map_,
const Registration::SampleIdentifier& topic_id_,
const std::string& topic_name_,
const SDataTypeInformation& topic_info_,
Registration::DescQualityFlags topic_quality_);

static void RemTopicDescription(SQualityTopicIdMap& topic_info_map_,
const STopicIdCallbackMap& topic_callback_map_,
const Registration::SampleIdentifier& topic_id_,
const std::string& topic_name_);

Expand All @@ -114,12 +128,20 @@ namespace eCAL
const Registration::SampleIdentifier& service_id_,
const std::string& service_name_);

Registration::CallbackToken CreateToken();

// internal quality topic info publisher/subscriber maps
SQualityTopicIdMap m_publisher_info_map;
STopicIdCallbackMap m_publisher_callback_map;

SQualityTopicIdMap m_subscriber_info_map;
STopicIdCallbackMap m_subscriber_callback_map;

// internal quality service info service/client maps
SQualityServiceIdMap m_service_info_map;
SQualityServiceIdMap m_client_info_map;

mutable std::mutex m_callback_token_mtx;
std::atomic<Registration::CallbackToken> m_callback_token{ 0 };
};
}
24 changes: 24 additions & 0 deletions ecal/core/src/registration/ecal_registration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ namespace eCAL
return g_descgate()->GetPublisherInfo(id_, topic_info_);
}

ECAL_API CallbackToken AddPublisherEventCallback(const TopicIDCallbackT& callback_)
{
if (g_descgate() == nullptr) return CallbackToken();
return g_descgate()->AddPublisherEventCallback(callback_);
}

ECAL_API void RemPublisherEventCallback(CallbackToken token_)
{
if (g_descgate() == nullptr) return;
return g_descgate()->RemPublisherEventCallback(token_);
}

std::set<STopicId> GetSubscriberIDs()
{
if (g_descgate() == nullptr) return std::set<STopicId>();
Expand All @@ -185,6 +197,18 @@ namespace eCAL
return g_descgate()->GetSubscriberInfo(id_, topic_info_);
}

ECAL_API CallbackToken AddSubscriberEventCallback(const TopicIDCallbackT& callback_)
{
if (g_descgate() == nullptr) return CallbackToken();
return g_descgate()->AddSubscriberEventCallback(callback_);
}

ECAL_API void RemSubscriberEventCallback(CallbackToken token_)
{
if (g_descgate() == nullptr) return;
return g_descgate()->RemSubscriberEventCallback(token_);
}

std::set<SServiceId> GetServiceIDs()
{
if (g_descgate() == nullptr) return std::set<SServiceId>();
Expand Down
Loading

0 comments on commit b130791

Please sign in to comment.