Skip to content

Commit

Permalink
Merge pull request OpenDDS#4533 from mitza-oci/spdp-user-tag
Browse files Browse the repository at this point in the history
Add a configurable "user tag" to SPDP
  • Loading branch information
jrw972 authored Mar 26, 2024
2 parents 72968a9 + 502a7e6 commit 3c813b5
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 22 deletions.
7 changes: 0 additions & 7 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ name: "MPC Builds"

on:
push:
branches:
- master
- branch-DDS-3.*
- gh_wf_*
paths:
- '**'
# Don't run this workflow if the only files that changed are the
Expand All @@ -29,9 +25,6 @@ on:
- '!**/.lint_config'
- '!**/README*'
pull_request:
branches:
- master
- branch-DDS-3.*
paths:
- '**'
# Don't run this workflow if the only files that changed are the
Expand Down
13 changes: 13 additions & 0 deletions dds/DCPS/RTPS/RtpsCore.idl
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,13 @@ module OpenDDS {
DDS::OctetSeq content;
};

// UserTagSubmessage is an OpenDDS-specific extension that adds a configurable
// 32-bit value to the start of certain RTPS messages.
struct UserTagSubmessage {
SubmessageHeader smHeader;
unsigned long userTag;
};

@OpenDDS::internal::special_serialization
union Submessage switch (SubmessageKind) {
case PAD:
Expand Down Expand Up @@ -823,6 +830,12 @@ module OpenDDS {
case SRTPS_POSTFIX:
SecuritySubmessage security_sm;

// SUBMESSAGE_KIND_USER_TAG isn't included here.
// We don't need to parse incoming submessages or add a UserTagSubmessage
// to a generic collection of Submessages.
// If "vendor-specific" types are added, incoming messages need to have
// the Header::vendorId checked.

default:
SubmessageHeader unknown_sm;
};
Expand Down
14 changes: 14 additions & 0 deletions dds/DCPS/RTPS/RtpsDiscoveryConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,20 @@ RtpsDiscoveryConfig::check_source_ip(bool flag)
flag);
}

ACE_CDR::ULong
RtpsDiscoveryConfig::spdp_user_tag() const
{
return TheServiceParticipant->config_store()->get_uint32(config_key("SPDP_USER_TAG").c_str(),
0);
}

void
RtpsDiscoveryConfig::spdp_user_tag(ACE_CDR::ULong tag)
{
TheServiceParticipant->config_store()->set_uint32(config_key("SPDP_USER_TAG").c_str(),
tag);
}

} // namespace DCPS
} // namespace OpenDDS

Expand Down
3 changes: 3 additions & 0 deletions dds/DCPS/RTPS/RtpsDiscoveryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ class OpenDDS_Rtps_Export RtpsDiscoveryConfig : public OpenDDS::DCPS::RcObject {
bool check_source_ip() const;
void check_source_ip(bool flag);

ACE_CDR::ULong spdp_user_tag() const;
void spdp_user_tag(ACE_CDR::ULong tag);

private:
const String config_prefix_;
};
Expand Down
7 changes: 6 additions & 1 deletion dds/DCPS/RTPS/RtpsSubmessageKind.idl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
module OpenDDS {
module RTPS {

const octet SUBMESSAGE_VENDOR_SPECIFIC_BASE = 0x80;
const octet SUBMESSAGE_OPENDDS_BASE = SUBMESSAGE_VENDOR_SPECIFIC_BASE | 0x30;
const octet SUBMESSAGE_OPENDDS_USER_TAG = SUBMESSAGE_OPENDDS_BASE | 1;

enum SubmessageKind {
@value(0x00) RTPS_HE, /* HeaderExtension */
@value(0x01) PAD, /* Pad */
Expand All @@ -30,9 +34,10 @@ enum SubmessageKind {
@value(0x31) SEC_PREFIX,
@value(0x32) SEC_POSTFIX,
@value(0x33) SRTPS_PREFIX,
@value(0x34) SRTPS_POSTFIX
@value(0x34) SRTPS_POSTFIX,

// SubmessageKinds 0x80 and above are vendor-specific
@value(SUBMESSAGE_OPENDDS_USER_TAG) SUBMESSAGE_KIND_USER_TAG /* UserTagSubmessage */
};

};
Expand Down
44 changes: 38 additions & 6 deletions dds/DCPS/RTPS/Spdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2409,6 +2409,19 @@ Spdp::SpdpTransport::SpdpTransport(DCPS::RcHandle<Spdp> outer)
outer->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
}
#endif

const ACE_CDR::ULong userTag = outer->config_->spdp_user_tag();
if (userTag) {
user_tag_.smHeader.submessageId = SUBMESSAGE_KIND_USER_TAG;
user_tag_.smHeader.flags = FLAG_E;
user_tag_.smHeader.submessageLength = DCPS::uint32_cdr_size;
user_tag_.userTag = userTag;
} else {
user_tag_.smHeader.submessageId = 0;
user_tag_.smHeader.flags = 0;
user_tag_.smHeader.submessageLength = 0;
user_tag_.userTag = 0;
}
}

void
Expand Down Expand Up @@ -2713,7 +2726,7 @@ Spdp::SpdpTransport::write_i(WriteFlags flags)
if (!ParameterListConverter::to_param_list(pdata, plist)) {
if (DCPS::DCPS_debug_level > 0) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
ACE_TEXT("Spdp::SpdpTransport::write() - ")
ACE_TEXT("Spdp::SpdpTransport::write_i: ")
ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
ACE_TEXT("to ParameterList\n")));
}
Expand All @@ -2735,7 +2748,7 @@ Spdp::SpdpTransport::write_i(WriteFlags flags)
if (!ParameterListConverter::to_param_list(ai_map, plist)) {
if (DCPS::DCPS_debug_level > 0) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
ACE_TEXT("Spdp::SpdpTransport::write() - ")
ACE_TEXT("Spdp::SpdpTransport::write_i: ")
ACE_TEXT("failed to convert from ICE::AgentInfo ")
ACE_TEXT("to ParameterList\n")));
}
Expand All @@ -2747,11 +2760,30 @@ Spdp::SpdpTransport::write_i(WriteFlags flags)
wbuff_.reset();
DCPS::Serializer ser(&wbuff_, encoding_plain_native);
DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
if (!(ser << hdr_) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
if (DCPS::DCPS_debug_level > 0) {
if (!(ser << hdr_)) {
if (log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
ACE_TEXT("failed to serialize headers for SPDP\n")));
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i: ")
ACE_TEXT("failed to serialize RTPS header for SPDP\n")));
}
return;
}
// The implementation-specific UserTagSubmessage is designed to directly
// follow the RTPS Message Header. No other submessages should be added
// before it. This enables filtering based on a fixed offset.
if (user_tag_.smHeader.submessageId && !(ser << user_tag_)) {
if (log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i: ")
ACE_TEXT("failed to serialize user tag for SPDP\n")));
}
return;
}
if (!(ser << data_) || !(ser << encap) || !(ser << plist)) {
if (log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i: ")
ACE_TEXT("failed to serialize data submessage for SPDP\n")));
}
return;
}
Expand Down
1 change: 1 addition & 0 deletions dds/DCPS/RTPS/Spdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ class OpenDDS_Rtps_Export Spdp

DCPS::WeakRcHandle<Spdp> outer_;
Header hdr_;
UserTagSubmessage user_tag_;
DataSubmessage data_;
DCPS::SequenceNumber seq_;
u_short uni_port_;
Expand Down
8 changes: 8 additions & 0 deletions docs/devguide/run_time_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,14 @@ Those properties, along with options specific to OpenDDS's RTPS Discovery implem

- 1 (enabled)

* - ``SpdpUserTag=i``

- Add the OpenDDS-specific UserTag RTPS submessage to the start of SPDP messages.
If i is 0 (the default), the submessage is not added.
Otherwise this submessage's contents is the 4-byte unsigned integer i.

- 0 (disabled)

.. note:: If the environment variable ``OPENDDS_RTPS_DEFAULT_D0`` is set, its value is used as the ``D0`` default value.

.. _run_time_configuration--additional-ddsi-rtps-discovery-features:
Expand Down
6 changes: 6 additions & 0 deletions docs/news.d/spdp-user-tag.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.. news-prs: 4533
.. news-start-section: Additions
- DDSI-RTPS Discovery has a new configuration option: SpdpUserTag.

.. news-end-section
11 changes: 3 additions & 8 deletions tests/transport/spdp/spdp_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ struct TestParticipant: ACE_Event_Handler {
ACE_Message_Block mb(size);
Serializer ser(&mb, encoding);

const EncapsulationHeader encap (encoding, MUTABLE);
const EncapsulationHeader encap(encoding, MUTABLE);
if (!(ser << hdr_ && ser << ds && ser << encap)) {
ACE_DEBUG((LM_DEBUG, "ERROR: failed to serialize headers\n"));
return false;
Expand Down Expand Up @@ -275,6 +275,7 @@ bool run_test()
RtpsDiscovery rd("test");
const ACE_INET_Addr local_addr(u_short(7575), "0.0.0.0");
rd.config()->spdp_local_address(NetworkAddress(local_addr));
rd.config()->spdp_user_tag(0x99887766);
const DDS::DomainId_t domain = 0;
const DDS::DomainParticipantQos qos = TheServiceParticipant->initial_DomainParticipantQos();
GUID_t id = rd.generate_participant_guid();
Expand Down Expand Up @@ -533,16 +534,10 @@ bool run_test()
int ACE_TMAIN(int, ACE_TCHAR*[])
{
DDS::DomainParticipantFactory_var dpf;
bool ok = false;
try {
dpf = TheServiceParticipant->get_domain_participant_factory();
set_DCPS_debug_level(1);
} catch (const CORBA::BAD_PARAM& ex) {
ex._tao_print_exception("Exception caught in spdp_transport.cpp:");
return 1;
}

bool ok = false;
try {
ok = run_test();
if (!ok) {
ACE_ERROR((LM_ERROR, "ERROR: test failed\n"));
Expand Down

0 comments on commit 3c813b5

Please sign in to comment.