Skip to content

Commit

Permalink
PortMode for RTPS/UDP, Fixes for RTPS Disc Ports
Browse files Browse the repository at this point in the history
  • Loading branch information
iguessthislldo committed May 25, 2024
1 parent b4a030c commit b547e36
Show file tree
Hide file tree
Showing 14 changed files with 695 additions and 161 deletions.
28 changes: 25 additions & 3 deletions dds/DCPS/NetworkResource.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
/*
*
*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/

#include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
#include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/

#include "NetworkResource.h"

#include "LogAddr.h"
#include "TimeTypes.h"
#include "debug.h"

#include <ace/OS_NS_netdb.h>
#include <ace/Sock_Connect.h>
Expand Down Expand Up @@ -435,6 +434,29 @@ bool set_socket_multicast_ttl(const ACE_SOCK_Dgram& socket, const unsigned char&
return true;
}

bool set_recvpktinfo(ACE_SOCK_Dgram& sock, bool ipv4)
{
bool success = true;
#if defined ACE_RECVPKTINFO || defined ACE_RECVPKTINFO6
if (ipv4) {
# ifdef ACE_RECVPKTINFO
success = set_sock_opt(sock, IPPROTO_IP, ACE_RECVPKTINFO, 1);
# endif
} else {
# ifdef ACE_RECVPKTINFO6
success = set_sock_opt(sock, IPPROTO_IPV6, ACE_RECVPKTINFO6, 1);
# endif
}
if (!success && log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: set_recvpktinfo: failed to set RECVPKTINFO: %m\n"));
}
# else
ACE_UNUSED_ARG(sock);
ACE_UNUSED_ARG(ipv4);
# endif
return success;
}

bool open_appropriate_socket_type(ACE_SOCK_Dgram& socket, const ACE_INET_Addr& local_address, int* proto_family)
{
#if defined (ACE_HAS_IPV6) && defined (IPV6_V6ONLY)
Expand Down
13 changes: 13 additions & 0 deletions dds/DCPS/NetworkResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ void get_interface_addrs(OPENDDS_VECTOR(ACE_INET_Addr)& addrs);
extern OpenDDS_Dcps_Export
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram& socket, const unsigned char& ttl);

template <typename T>
bool set_sock_opt(ACE_SOCK_Dgram& sock,
int level, int option, T value, bool ignore_notsup = false)
{
if (sock.set_option(level, option, (void*) &value, sizeof(T)) < 0) {
return ignore_notsup && errno == ENOTSUP;
}
return true;
}

OpenDDS_Dcps_Export
bool set_recvpktinfo(ACE_SOCK_Dgram& sock, bool ipv4);

/// Helper function to create dual stack socket to support IPV4 and IPV6,
/// for IPV6 builds allows for setting IPV6_V6ONLY socket option to 0 before binding
/// Otherwise defaults to opening a socket based on the type of local_address
Expand Down
11 changes: 6 additions & 5 deletions dds/DCPS/RTPS/MessageUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,15 @@ bool get_rtps_port(DDS::UInt16& port_result, const char* what,
{
const DDS::UInt32 port = static_cast<DDS::UInt32>(port_base) +
domain * domain_gain + part * part_gain + offset;
ACE_DEBUG((LM_DEBUG, "HERE %C %u + %u * %u + %u * %u + %u = %u\n", what, port_base, domain, domain_gain, part, part_gain, offset, port));
port_result = static_cast<DDS::UInt16>(port);
if (port > 65535) {
if (log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: rtps_port: %C port %u is too high\n", what, port));
if (log_level >= LogLevel::Warning) {
ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: rtps_port: "
"%C port %u is going to be truncated to %u. This behavior is deprecated, please reduce "
"domain ID or other RTPS port parameters.\n",
what, port, port_result));
}
return false;
}
port_result = static_cast<DDS::UInt16>(port);
return true;
}

Expand Down
23 changes: 13 additions & 10 deletions dds/DCPS/RTPS/Sedp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ Sedp::init(const GUID_t& guid,
DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
OPENDDS_STRING("_SEDPTransportInst_") + key + domainStr,
"rtps_udp");
DCPS::RtpsUdpInst_rch inst = DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);

// Be careful to not call any function that causes the transport be
// to created before the configuration is complete.
Expand Down Expand Up @@ -454,25 +455,27 @@ Sedp::init(const GUID_t& guid,
config_store_->set(transport_inst_->config_key("MULTICAST_INTERFACE").c_str(), disco.multicast_interface());
}

DCPS::NetworkAddress addr4;
if (!disco.config()->sedp_unicast_address(addr4, domainId, ipv4_participant_port_id)) {
return DDS::RETCODE_ERROR;
}
inst->port_mode(disco.config()->sedp_port_mode());
inst->pb(disco.config()->pb());
inst->dg(disco.config()->dg());
inst->pg(disco.config()->pg());
inst->d2(disco.config()->dx());
inst->d3(disco.config()->dy());
inst->d3(disco.config()->dy());

inst->init_participant_port_id(ipv4_participant_port_id);
config_store_->set(transport_inst_->config_key("LOCAL_ADDRESS").c_str(),
addr4,
disco.config()->sedp_local_address(),
ConfigStoreImpl::Format_Required_Port,
ConfigStoreImpl::Kind_IPV4);
config_store_->set(transport_inst_->config_key("ADVERTISED_ADDRESS").c_str(),
disco.config()->sedp_advertised_local_address(),
ConfigStoreImpl::Format_Required_Port,
ConfigStoreImpl::Kind_IPV4);
#ifdef ACE_HAS_IPV6
DCPS::NetworkAddress addr6;
if (!disco.config()->ipv6_sedp_unicast_address(addr6, domainId, ipv6_participant_port_id)) {
return DDS::RETCODE_ERROR;
}
inst->ipv6_init_participant_port_id(ipv6_participant_port_id);
config_store_->set(transport_inst_->config_key("IPV6_LOCAL_ADDRESS").c_str(),
addr6,
disco.config()->ipv6_sedp_local_address(),
ConfigStoreImpl::Format_Required_Port,
ConfigStoreImpl::Kind_IPV6);
config_store_->set(transport_inst_->config_key("IPV6_ADVERTISED_ADDRESS").c_str(),
Expand Down
4 changes: 3 additions & 1 deletion dds/DCPS/RTPS/Sedp.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ class Sedp : public virtual DCPS::RcEventHandler {
public:
Endpoint(const DCPS::GUID_t& repo_id, Sedp& sedp)
: repo_id_(repo_id)
, domain_id_(sedp.get_domain_id())
, sedp_(sedp)
, shutting_down_(false)
#ifdef OPENDDS_SECURITY
Expand All @@ -612,7 +613,7 @@ class Sedp : public virtual DCPS::RcEventHandler {

DDS::DomainId_t domain_id() const
{
return 0; // not used for SEDP
return domain_id_;
}

CORBA::Long get_priority_value(const DCPS::AssociationData&) const
Expand Down Expand Up @@ -665,6 +666,7 @@ class Sedp : public virtual DCPS::RcEventHandler {

protected:
DCPS::GUID_t repo_id_;
DDS::DomainId_t domain_id_;
Sedp& sedp_;
AtomicBool shutting_down_;
#ifdef OPENDDS_SECURITY
Expand Down
47 changes: 10 additions & 37 deletions dds/DCPS/RTPS/Spdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ using DCPS::ENDIAN_LITTLE;
using DCPS::LogLevel;
using DCPS::log_level;
using DCPS::LogAddr;
using DCPS::set_sock_opt;

namespace {
const Encoding encoding_plain_big(Encoding::KIND_XCDR1, ENDIAN_BIG);
Expand Down Expand Up @@ -2384,10 +2385,7 @@ Spdp::SpdpTransport::SpdpTransport(DCPS::RcHandle<Spdp> outer)
const DCPS::NetworkAddressSet addrs = outer->config_->spdp_send_addrs();
send_addrs_.insert(addrs.begin(), addrs.end());

#ifdef OPENDDS_SAFETY_PROFILE
const DDS::UInt16 startingParticipantId = outer->ipv4_participant_port_id_;
#endif

const DDS::UInt16 max_part_id = 119; // RTPS 2.5 9.6.2.3
while (!open_unicast_socket(outer->ipv4_participant_port_id_)) {
if (outer->ipv4_participant_port_id_ == max_part_id && log_level >= LogLevel::Warning) {
Expand All @@ -2399,12 +2397,18 @@ Spdp::SpdpTransport::SpdpTransport(DCPS::RcHandle<Spdp> outer)
// could use this as a hard limit, but that's much less of a concern.
}
++outer->ipv4_participant_port_id_;
if (outer->ipv4_participant_port_id_ == startingParticipantId) {
throw std::runtime_error("could not find a free IPv4 unicast port for SPDP");
}
}

#ifdef ACE_HAS_IPV6
outer->ipv6_participant_port_id_ = outer->ipv4_participant_port_id_;
while (!open_unicast_ipv6_socket(outer->ipv6_participant_port_id_)) {
++outer->ipv6_participant_port_id_;
if (outer->ipv4_participant_port_id_ == outer->ipv6_participant_port_id_) {
throw std::runtime_error("could not find a free IPv6 unicast port for SPDP");
}
}
#endif

Expand Down Expand Up @@ -3447,18 +3451,6 @@ Spdp::SpdpTransport::open_unicast_socket(DDS::UInt16 participant_id)
return true;
}

namespace {
template <typename T>
bool set_sock_opt(ACE_SOCK_Dgram& sock,
int level, int option, T value, bool ignore_notsup = false)
{
if (sock.set_option(level, option, (void*) &value, sizeof(T)) < 0) {
return ignore_notsup && errno == ENOTSUP;
}
return true;
}
}

void Spdp::SpdpTransport::set_unicast_socket_opts(
DCPS::RcHandle<Spdp>& outer, ACE_SOCK_Dgram& sock, DDS::UInt16& port)
{
Expand All @@ -3467,18 +3459,13 @@ void Spdp::SpdpTransport::set_unicast_socket_opts(
throw std::runtime_error("failed to get address from socket");
}
port = addr.get_port_number();
const bool ipv6 =
#ifdef ACE_HAS_IPV6
addr.get_type () == AF_INET6;
#else
false;
#endif
const bool ipv4 = addr.get_type () == AF_INET;

if (DCPS::DCPS_debug_level > 3) {
ACE_DEBUG((LM_DEBUG,
"(%P|%t) Spdp::SpdpTransport::set_unicast_socket_opts: "
"opened %C unicast socket %d on port %d\n",
ipv6 ? "IPv6" : "IPv4", sock.get_handle(), port));
ipv4 ? "IPv4" : "IPv6", sock.get_handle(), port));
}

if (!DCPS::set_socket_multicast_ttl(sock, outer->config_->ttl())) {
Expand Down Expand Up @@ -3509,21 +3496,7 @@ void Spdp::SpdpTransport::set_unicast_socket_opts(
throw std::runtime_error("failed to set recv buffer size");
}

bool success = true;
if (ipv6) {
#ifdef ACE_RECVPKTINFO6
success = set_sock_opt(sock, IPPROTO_IPV6, ACE_RECVPKTINFO6, 1);
#endif
} else {
#ifdef ACE_RECVPKTINFO
success = set_sock_opt(sock, IPPROTO_IP, ACE_RECVPKTINFO, 1);
#endif
}
if (!success) {
if (log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Spdp::SpdpTransport::set_unicast_socket_opts: "
"failed to set RECVPKTINFO: %m\n"));
}
if (!DCPS::set_recvpktinfo(sock, ipv4)) {
throw std::runtime_error("failed to set RECVPKTINFO");
}
}
Expand Down
10 changes: 5 additions & 5 deletions dds/DCPS/transport/framework/TransportInst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,23 +287,23 @@ TransportInst::get_or_create_impl(DDS::DomainId_t domain,
{
ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, g, lock_, TransportImpl_rch());

// Only use the domain if the inst is a template.
// Only use the domain to find the impl is if this inst is a template.
// Furthermore, only use the client if the instantiation rule is per_participant.

DDS::DomainId_t find_domain = domain;
if (is_template_) {
if (instantiation_rule() != "per_participant") {
participant = 0;
}
} else {
domain = 0;
find_domain = 0;
participant = 0;
}

if (!shutting_down_) {
try {
DomainMap::iterator pos = domain_map_.find(domain);
DomainMap::iterator pos = domain_map_.find(find_domain);
if (pos == domain_map_.end()) {
pos = domain_map_.insert(std::make_pair(domain, ParticipantMap())).first;
pos = domain_map_.insert(std::make_pair(find_domain, ParticipantMap())).first;
}
ParticipantMap::iterator pos2 = pos->second.find(participant);
if (pos2 == pos->second.end()) {
Expand Down
Loading

0 comments on commit b547e36

Please sign in to comment.