Skip to content

Commit

Permalink
Private discovery msg.
Browse files Browse the repository at this point in the history
Signed-off-by: Carlos Agüero <[email protected]>
  • Loading branch information
caguero committed Feb 1, 2024
1 parent 764a04b commit 77c2c47
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 47 deletions.
75 changes: 38 additions & 37 deletions include/gz/transport/Discovery.hh
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
#pragma warning(disable: 4996)
#endif

#include <gz/msgs/discovery.pb.h>

#include <algorithm>
#include <condition_variable>
#include <limits>
Expand All @@ -70,6 +68,7 @@

#include <gz/msgs/Utility.hh>

#include "gz/transport/private_msgs/discovery.pb.h"
#include "gz/transport/config.hh"
#include "gz/transport/Export.hh"
#include "gz/transport/Helpers.hh"
Expand Down Expand Up @@ -262,7 +261,7 @@ namespace gz

// Broadcast a BYE message to trigger the remote cancellation of
// all our advertised topics.
this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
this->SendMsg(DestinationType::ALL, private_msgs::Discovery::BYE,
Publisher("", "", this->pUuid, "", AdvertiseOptions()));

// Close sockets.
Expand Down Expand Up @@ -320,8 +319,8 @@ namespace gz
// Only advertise a message outside this process if the scope
// is not 'Process'
if (_publisher.Options().Scope() != Scope_t::PROCESS)
this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
_publisher);
this->SendMsg(DestinationType::ALL,
private_msgs::Discovery::ADVERTISE, _publisher);

return true;
}
Expand Down Expand Up @@ -356,7 +355,8 @@ namespace gz
pub.SetPUuid(this->pUuid);

// Send a discovery request.
this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
this->SendMsg(DestinationType::ALL, private_msgs::Discovery::SUBSCRIBE,
pub);

{
std::lock_guard<std::mutex> lock(this->mutex);
Expand Down Expand Up @@ -389,23 +389,23 @@ namespace gz
public: void SendSubscribersRep(const MessagePublisher &_pub) const
{
this->SendMsg(
DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
DestinationType::ALL, private_msgs::Discovery::SUBSCRIBERS_REP, _pub);
}

/// \brief Register a node from this process as a remote subscriber.
/// \param[in] _pub Contains information about the subscriber.
public: void Register(const MessagePublisher &_pub) const
{
this->SendMsg(
DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
DestinationType::ALL, private_msgs::Discovery::NEW_CONNECTION, _pub);
}

/// \brief Unregister a node from this process as a remote subscriber.
/// \param[in] _pub Contains information about the subscriber.
public: void Unregister(const MessagePublisher &_pub) const
{
this->SendMsg(
DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
DestinationType::ALL, private_msgs::Discovery::END_CONNECTION, _pub);
}

/// \brief Get the discovery information.
Expand Down Expand Up @@ -468,7 +468,7 @@ namespace gz
if (inf.Options().Scope() != Scope_t::PROCESS)
{
this->SendMsg(DestinationType::ALL,
msgs::Discovery::UNADVERTISE, inf);
private_msgs::Discovery::UNADVERTISE, inf);
}

return true;
Expand Down Expand Up @@ -642,7 +642,7 @@ namespace gz
// Request the list of subscribers.
Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
this->SendMsg(
DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);
DestinationType::ALL, private_msgs::Discovery::SUBSCRIBERS_REQ, pub);

this->WaitForInit();
std::lock_guard<std::mutex> lock(this->mutex);
Expand Down Expand Up @@ -746,7 +746,8 @@ namespace gz
}

Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
this->SendMsg(DestinationType::ALL, private_msgs::Discovery::HEARTBEAT,
pub);

std::map<std::string, std::vector<Pub>> nodes;
{
Expand All @@ -761,7 +762,7 @@ namespace gz
for (const auto &node : topic.second)
{
this->SendMsg(DestinationType::ALL,
msgs::Discovery::ADVERTISE, node);
private_msgs::Discovery::ADVERTISE, node);
}
}

Expand Down Expand Up @@ -903,7 +904,7 @@ namespace gz
private: void DispatchDiscoveryMsg(const std::string &_fromIp,
char *_msg, uint16_t _len)
{
gz::msgs::Discovery msg;
private_msgs::Discovery msg;

// Parse the message, and return if parsing failed. Parsing could
// fail when another discovery node is publishing messages using an
Expand Down Expand Up @@ -975,7 +976,7 @@ namespace gz

switch (msg.type())
{
case msgs::Discovery::ADVERTISE:
case private_msgs::Discovery::ADVERTISE:
{
// Read the rest of the fields.
Pub publisher;
Expand Down Expand Up @@ -1004,7 +1005,7 @@ namespace gz

break;
}
case msgs::Discovery::SUBSCRIBE:
case private_msgs::Discovery::SUBSCRIBE:
{
std::string recvTopic;
// Read the topic information.
Expand Down Expand Up @@ -1044,19 +1045,19 @@ namespace gz

// Answer an ADVERTISE message.
this->SendMsg(DestinationType::ALL,
msgs::Discovery::ADVERTISE, nodeInfo);
private_msgs::Discovery::ADVERTISE, nodeInfo);
}

break;
}
case msgs::Discovery::SUBSCRIBERS_REQ:
case private_msgs::Discovery::SUBSCRIBERS_REQ:
{
if (subscribersReqCb)
subscribersReqCb();

break;
}
case msgs::Discovery::SUBSCRIBERS_REP:
case private_msgs::Discovery::SUBSCRIBERS_REP:
{
// Save the remote subscriber.
Pub publisher;
Expand All @@ -1068,7 +1069,7 @@ namespace gz
}
break;
}
case msgs::Discovery::NEW_CONNECTION:
case private_msgs::Discovery::NEW_CONNECTION:
{
// Read the rest of the fields.
Pub publisher;
Expand All @@ -1079,7 +1080,7 @@ namespace gz

break;
}
case msgs::Discovery::END_CONNECTION:
case private_msgs::Discovery::END_CONNECTION:
{
// Read the rest of the fields.
Pub publisher;
Expand All @@ -1090,12 +1091,12 @@ namespace gz

break;
}
case msgs::Discovery::HEARTBEAT:
case private_msgs::Discovery::HEARTBEAT:
{
// The timestamp has already been updated.
break;
}
case msgs::Discovery::BYE:
case private_msgs::Discovery::BYE:
{
// Remove the activity entry for this publisher.
{
Expand All @@ -1119,7 +1120,7 @@ namespace gz

break;
}
case msgs::Discovery::UNADVERTISE:
case private_msgs::Discovery::UNADVERTISE:
{
// Read the address.
Pub publisher;
Expand Down Expand Up @@ -1164,34 +1165,34 @@ namespace gz
/// or encryption.
private: template<typename T>
void SendMsg(const DestinationType &_destType,
const msgs::Discovery::Type _type,
const private_msgs::Discovery::Type _type,
const T &_pub) const
{
gz::msgs::Discovery discoveryMsg;
private_msgs::Discovery discoveryMsg;
discoveryMsg.set_version(this->Version());
discoveryMsg.set_type(_type);
discoveryMsg.set_process_uuid(this->pUuid);
_pub.FillDiscovery(discoveryMsg);

switch (_type)
{
case msgs::Discovery::ADVERTISE:
case msgs::Discovery::UNADVERTISE:
case msgs::Discovery::NEW_CONNECTION:
case msgs::Discovery::END_CONNECTION:
case private_msgs::Discovery::ADVERTISE:
case private_msgs::Discovery::UNADVERTISE:
case private_msgs::Discovery::NEW_CONNECTION:
case private_msgs::Discovery::END_CONNECTION:
{
_pub.FillDiscovery(discoveryMsg);
break;
}
case msgs::Discovery::SUBSCRIBE:
case private_msgs::Discovery::SUBSCRIBE:
{
discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
break;
}
case msgs::Discovery::HEARTBEAT:
case msgs::Discovery::BYE:
case msgs::Discovery::SUBSCRIBERS_REQ:
case msgs::Discovery::SUBSCRIBERS_REP:
case private_msgs::Discovery::HEARTBEAT:
case private_msgs::Discovery::BYE:
case private_msgs::Discovery::SUBSCRIBERS_REQ:
case private_msgs::Discovery::SUBSCRIBERS_REP:
break;
default:
std::cerr << "Discovery::SendMsg() error: Unrecognized message"
Expand Down Expand Up @@ -1223,7 +1224,7 @@ namespace gz

/// \brief Send a discovery message through all unicast relays.
/// \param[in] _msg Discovery message.
private: void SendUnicast(const msgs::Discovery &_msg) const
private: void SendUnicast(const private_msgs::Discovery &_msg) const
{
uint16_t msgSize;

Expand Down Expand Up @@ -1277,7 +1278,7 @@ namespace gz

/// \brief Send a discovery message through the multicast group.
/// \param[in] _msg Discovery message.
private: void SendMulticast(const msgs::Discovery &_msg) const
private: void SendMulticast(const private_msgs::Discovery &_msg) const
{
uint16_t msgSize;

Expand Down
20 changes: 12 additions & 8 deletions include/gz/transport/Publisher.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
#ifndef GZ_TRANSPORT_PUBLISHER_HH_
#define GZ_TRANSPORT_PUBLISHER_HH_

#include <gz/msgs/discovery.pb.h>

#include <iostream>
#include <string>

#include "gz/transport/private_msgs/discovery.pb.h"
#include "gz/transport/AdvertiseOptions.hh"
#include "gz/transport/config.hh"
#include "gz/transport/Export.hh"
Expand Down Expand Up @@ -113,11 +112,12 @@ namespace gz

/// \brief Populate a discovery message.
/// \param[in] _msg Message to fill.
public: virtual void FillDiscovery(msgs::Discovery &_msg) const;
public: virtual void FillDiscovery(private_msgs::Discovery &_msg) const;

/// \brief Set data from a discovery message.
/// \param[in] _msg Discovery message.
public: virtual void SetFromDiscovery(const msgs::Discovery &_msg);
public: virtual void SetFromDiscovery(
const private_msgs::Discovery &_msg);

/// \brief Equality operator. This function checks if the given
/// publisher has identical Topic, Addr, PUuid, NUuid, and Scope
Expand Down Expand Up @@ -234,11 +234,13 @@ namespace gz

/// \brief Populate a discovery message.
/// \param[in] _msg Message to fill.
public: virtual void FillDiscovery(msgs::Discovery &_msg) const final;
public: virtual void FillDiscovery(
private_msgs::Discovery &_msg) const final;

/// \brief Set data from a discovery message.
/// \param[in] _msg Discovery message.
public: virtual void SetFromDiscovery(const msgs::Discovery &_msg);
public: virtual void SetFromDiscovery(
const private_msgs::Discovery &_msg);

/// \brief Stream insertion operator.
/// \param[out] _out The output stream.
Expand Down Expand Up @@ -361,12 +363,14 @@ namespace gz

/// \brief Populate a discovery message.
/// \param[in] _msg Message to fill.
public: virtual void FillDiscovery(msgs::Discovery &_msg) const final;
public: virtual void FillDiscovery(
private_msgs::Discovery &_msg) const final;

/// \brief Populate a discovery message.
/// \brief Set data from a discovery message.
/// \param[in] _msg Discovery message.
public: virtual void SetFromDiscovery(const msgs::Discovery &_msg);
public: virtual void SetFromDiscovery(
const private_msgs::Discovery &_msg);

/// \brief Stream insertion operator.
/// \param[out] _out The output stream.
Expand Down
35 changes: 34 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,41 @@
# "gtest_sources" variable.
gz_get_libsources_and_unittests(sources gtest_sources)

file(GLOB gz_msgs_proto_files
${CMAKE_CURRENT_SOURCE_DIR}/proto/gz/transport/private_msgs/*.proto
)

get_target_property(msgs_desc_file
gz-msgs${GZ_MSGS_VER}::gz-msgs${GZ_MSGS_VER} GZ_MSGS_DESC_FILE)

gz_msgs_generate_messages_impl(
MSGS_GEN_SCRIPT
${gz-msgs10_PROTO_GENERATOR_SCRIPT}
FACTORY_GEN_SCRIPT
${gz-msgs10_FACTORY_GENERATOR_SCRIPT}
GZ_PROTOC_PLUGIN
${gz-msgs10_PROTO_GENERATOR_PLUGIN}
INPUT_PROTOS
${gz_msgs_proto_files}
PROTO_PACKAGE
"gz.transport.private_msgs"
PROTO_PATH
${CMAKE_CURRENT_SOURCE_DIR}/proto
OUTPUT_DIRECTORY
${PROJECT_BINARY_DIR}/gz_msgs_gen
OUTPUT_SOURCES
msgs_sources
OUTPUT_HEADERS
msgs_headers
OUTPUT_DETAIL_HEADERS
msgs_detail_headers
OUTPUT_PYTHON
msgs_python
DEPENDENCY_DESCRIPTIONS ${msgs_desc_file}
)

# Create the library target.
gz_create_core_library(SOURCES ${sources} CXX_STANDARD 17)
gz_create_core_library(SOURCES ${sources} ${msgs_sources} CXX_STANDARD 17)

# Link the libraries that we always need.
target_link_libraries(${PROJECT_LIBRARY_TARGET_NAME}
Expand Down
1 change: 0 additions & 1 deletion src/Node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*
*/
#include <gz/msgs/discovery.pb.h>
#include <gz/msgs/statistic.pb.h>

#include <algorithm>
Expand Down

0 comments on commit 77c2c47

Please sign in to comment.