Skip to content

Commit

Permalink
Refs #20650: Apply rev suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: JesusPoderoso <[email protected]>
  • Loading branch information
JesusPoderoso committed Jun 6, 2024
1 parent b34560d commit b039789
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 108 deletions.
6 changes: 3 additions & 3 deletions examples/cpp/delivery_mechanisms/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ std::shared_ptr<Application> Application::make_app(
switch (config.entity)
{
case CLIParser::EntityKind::PUBLISHER:
entity = std::make_shared<PublisherApp>(config.entity_configuration, topic_name);
entity = std::make_shared<PublisherApp>(config, topic_name);
break;
case CLIParser::EntityKind::SUBSCRIBER:
entity = std::make_shared<SubscriberApp>(config.entity_configuration, topic_name);
entity = std::make_shared<SubscriberApp>(config, topic_name);
break;
case CLIParser::EntityKind::PUBSUB:
entity = std::make_shared<PubSubApp>(config.entity_configuration, topic_name);
entity = std::make_shared<PubSubApp>(config, topic_name);
break;
case CLIParser::EntityKind::UNDEFINED:
default:
Expand Down
37 changes: 18 additions & 19 deletions examples/cpp/delivery_mechanisms/CLIParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <csignal>
#include <cstdlib>
#include <iostream>
#include <string>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/BuiltinTransports.hpp>
Expand Down Expand Up @@ -58,20 +57,14 @@ class CLIParser
DEFAULT
};

//! Entity configuration structure (shared for both publisher and subscriber applications)
struct entity_config
{
uint16_t samples = 0;
uint32_t domain = 0;
bool ignore_local_endpoints = false;
DeliveryMechanismKind delivery_mechanism = DeliveryMechanismKind::DEFAULT;
};

//! DeliveryMechanisms structure for the application
struct delivery_mechanisms_config
{
CLIParser::EntityKind entity = CLIParser::EntityKind::UNDEFINED;
entity_config entity_configuration;
bool ignore_local_endpoints = false;
uint16_t samples = 0;
uint32_t domain = 0;
DeliveryMechanismKind delivery_mechanism = DeliveryMechanismKind::DEFAULT;
};

/**
Expand Down Expand Up @@ -131,6 +124,12 @@ class CLIParser
{
delivery_mechanisms_config config;

if (argc < 2)
{
EPROSIMA_LOG_ERROR(CLI_PARSER, "missing entity argument");
print_help(EXIT_FAILURE);
}

std::string first_argument = argv[1];

if (first_argument == "publisher" )
Expand Down Expand Up @@ -177,7 +176,7 @@ class CLIParser
}
else
{
config.entity_configuration.domain = static_cast<uint16_t>(input);
config.domain = static_cast<uint16_t>(input);
}
}
catch (const std::invalid_argument& e)
Expand Down Expand Up @@ -213,7 +212,7 @@ class CLIParser
}
else
{
config.entity_configuration.samples = static_cast<uint16_t>(input);
config.samples = static_cast<uint16_t>(input);
}
}
catch (const std::invalid_argument& e)
Expand Down Expand Up @@ -241,25 +240,25 @@ class CLIParser
std::string mechanism = argv[i];
if (mechanism == "TCP" || mechanism == "tcp")
{
config.entity_configuration.delivery_mechanism = DeliveryMechanismKind::TCP;
config.delivery_mechanism = DeliveryMechanismKind::TCP;
}
else if (mechanism == "UDP" || mechanism == "udp")
{
config.entity_configuration.delivery_mechanism = DeliveryMechanismKind::UDP;
config.delivery_mechanism = DeliveryMechanismKind::UDP;
}
else if (mechanism == "SHM" || mechanism == "shm")
{
config.entity_configuration.delivery_mechanism = DeliveryMechanismKind::SHM;
config.delivery_mechanism = DeliveryMechanismKind::SHM;
}
else if (mechanism == "DATA-SHARING" || mechanism == "data-sharing")
{
config.entity_configuration.delivery_mechanism = DeliveryMechanismKind::DATA_SHARING;
config.delivery_mechanism = DeliveryMechanismKind::DATA_SHARING;
}
else if (mechanism == "INTRA-PROCESS" || mechanism == "intra-process")
{
if (config.entity == EntityKind::PUBSUB)
{
config.entity_configuration.delivery_mechanism = DeliveryMechanismKind::INTRA_PROCESS;
config.delivery_mechanism = DeliveryMechanismKind::INTRA_PROCESS;
}
else
{
Expand All @@ -284,7 +283,7 @@ class CLIParser
{
if (config.entity == EntityKind::PUBSUB)
{
config.entity_configuration.ignore_local_endpoints = true;
config.ignore_local_endpoints = true;
}
else
{
Expand Down
66 changes: 35 additions & 31 deletions examples/cpp/delivery_mechanisms/PubSubApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace examples {
namespace delivery_mechanisms {

PubSubApp::PubSubApp(
const CLIParser::entity_config& config,
const CLIParser::delivery_mechanisms_config& config,
const std::string& topic_name)
: participant_(nullptr)
, publisher_(nullptr)
Expand All @@ -75,51 +75,60 @@ PubSubApp::PubSubApp(
{
max_samples = DATAWRITER_QOS_DEFAULT.resource_limits().max_samples_per_instance;
}
// Special definitions for certain delivery mechanisms
std::shared_ptr<SharedMemTransportDescriptor> shm_transport_ = std::make_shared<SharedMemTransportDescriptor>();
shm_transport_->segment_size(shm_transport_->max_message_size() * max_samples);
std::shared_ptr<TCPv4TransportDescriptor> tcp_transport_ = std::make_shared<TCPv4TransportDescriptor>();
LibrarySettings library_settings;
Locator tcp_initial_peers_locator_;
tcp_initial_peers_locator_.kind = LOCATOR_KIND_TCPv4;
tcp_initial_peers_locator_.port = 0;
eprosima::fastrtps::rtps::IPLocator::setIPv4(tcp_initial_peers_locator_, "127.0.0.1");
pqos.transport().use_builtin_transports = false;

if (config.ignore_local_endpoints)
{
pqos.properties().properties().emplace_back(
"fastdds.ignore_local_endpoints",
"true");
}

// Transport default definitions
pqos.transport().use_builtin_transports = false;
LibrarySettings library_settings;
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;

switch (config.delivery_mechanism)
{
case CLIParser::DeliveryMechanismKind::INTRA_PROCESS:
// No transport needed, but at least a transport needs to be declared to avoid participant creation failure
case CLIParser::DeliveryMechanismKind::INTRA_PROCESS: // (It should never reach this section
{ // No transport needed, but at least a transport needs to be declared to avoid participant creation failure
pqos.transport().use_builtin_transports = true;
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_FULL;
break;
}
case CLIParser::DeliveryMechanismKind::SHM:
case CLIParser::DeliveryMechanismKind::DATA_SHARING:
{
std::shared_ptr<SharedMemTransportDescriptor> shm_transport_ =
std::make_shared<SharedMemTransportDescriptor>();
shm_transport_->segment_size(shm_transport_->max_message_size() * max_samples);
pqos.transport().user_transports.push_back(shm_transport_);
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
break;
}
case CLIParser::DeliveryMechanismKind::TCP:
{
std::shared_ptr<TCPv4TransportDescriptor> tcp_transport_ = std::make_shared<TCPv4TransportDescriptor>();
pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
pqos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(5, 0);
pqos.wire_protocol().builtin.initialPeersList.push_back(tcp_initial_peers_locator_);
tcp_transport_->sendBufferSize = 0;
tcp_transport_->receiveBufferSize = 0;
tcp_transport_->set_WAN_address("127.0.0.1");
tcp_transport_->add_listener_port(5100);
pqos.transport().user_transports.push_back(tcp_transport_);
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
break;
}
case CLIParser::DeliveryMechanismKind::UDP:
default:
{
pqos.transport().user_transports.push_back(std::make_shared<UDPv4TransportDescriptor>());
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
break;
}
default:
{
pqos.transport().use_builtin_transports = true;
break;
}
}

auto factory = DomainParticipantFactory::get_instance();
factory->set_library_settings(library_settings);
participant_ = factory->create_participant(config.domain, pqos, nullptr, StatusMask::none());
Expand Down Expand Up @@ -175,20 +184,15 @@ PubSubApp::PubSubApp(
writer_qos.resource_limits().max_samples_per_instance = max_samples;
reader_qos.resource_limits().max_samples = reader_qos.resource_limits().max_instances * max_samples;
writer_qos.resource_limits().max_samples = writer_qos.resource_limits().max_instances * max_samples;
switch (config.delivery_mechanism)
if (CLIParser::DeliveryMechanismKind::DATA_SHARING == config.delivery_mechanism)
{
case CLIParser::DeliveryMechanismKind::DATA_SHARING:
reader_qos.data_sharing().automatic();
writer_qos.data_sharing().automatic();
break;
case CLIParser::DeliveryMechanismKind::SHM:
case CLIParser::DeliveryMechanismKind::TCP:
case CLIParser::DeliveryMechanismKind::UDP:
case CLIParser::DeliveryMechanismKind::INTRA_PROCESS:
default:
reader_qos.data_sharing().off();
writer_qos.data_sharing().off();
break;
reader_qos.data_sharing().automatic();
writer_qos.data_sharing().automatic();
}
else
{
reader_qos.data_sharing().off();
writer_qos.data_sharing().off();
}
reader_ = subscriber_->create_datareader(topic_, reader_qos, this, StatusMask::all());
if (reader_ == nullptr)
Expand Down
2 changes: 1 addition & 1 deletion examples/cpp/delivery_mechanisms/PubSubApp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PubSubApp : public Application, public DataReaderListener, public DataWrit
public:

PubSubApp(
const CLIParser::entity_config& config,
const CLIParser::delivery_mechanisms_config& config,
const std::string& topic_name);

~PubSubApp();
Expand Down
53 changes: 30 additions & 23 deletions examples/cpp/delivery_mechanisms/PublisherApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace examples {
namespace delivery_mechanisms {

PublisherApp::PublisherApp(
const CLIParser::entity_config& config,
const CLIParser::delivery_mechanisms_config& config,
const std::string& topic_name)
: participant_(nullptr)
, publisher_(nullptr)
Expand All @@ -65,39 +65,51 @@ PublisherApp::PublisherApp(
{
max_samples = DATAWRITER_QOS_DEFAULT.resource_limits().max_samples_per_instance;
}
// Special definitions for certain delivery mechanisms
std::shared_ptr<SharedMemTransportDescriptor> shm_transport_ = std::make_shared<SharedMemTransportDescriptor>();
shm_transport_->segment_size(shm_transport_->max_message_size() * max_samples);
std::shared_ptr<TCPv4TransportDescriptor> tcp_transport_ = std::make_shared<TCPv4TransportDescriptor>();
LibrarySettings library_settings;

// Transport default definitions
pqos.transport().use_builtin_transports = false;
LibrarySettings library_settings;
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;

switch (config.delivery_mechanism)
{
case CLIParser::DeliveryMechanismKind::INTRA_PROCESS: // (It should never reach this section)
// No transport needed, but at least a transport needs to be declared to avoid participant creation failure
case CLIParser::DeliveryMechanismKind::INTRA_PROCESS: // (It should never reach this section
{ // No transport needed, but at least a transport needs to be declared to avoid participant creation failure
pqos.transport().use_builtin_transports = true;
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_FULL;
break;
}
case CLIParser::DeliveryMechanismKind::SHM:
case CLIParser::DeliveryMechanismKind::DATA_SHARING:
{
std::shared_ptr<SharedMemTransportDescriptor> shm_transport_ =
std::make_shared<SharedMemTransportDescriptor>();
shm_transport_->segment_size(shm_transport_->max_message_size() * max_samples);
pqos.transport().user_transports.push_back(shm_transport_);
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
break;
}
case CLIParser::DeliveryMechanismKind::TCP:
{
std::shared_ptr<TCPv4TransportDescriptor> tcp_transport_ = std::make_shared<TCPv4TransportDescriptor>();
pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
pqos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(5, 0);
tcp_transport_->sendBufferSize = 0;
tcp_transport_->receiveBufferSize = 0;
tcp_transport_->set_WAN_address("127.0.0.1");
tcp_transport_->add_listener_port(5100);
pqos.transport().user_transports.push_back(tcp_transport_);
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
break;
}
case CLIParser::DeliveryMechanismKind::UDP:
default:
{
pqos.transport().user_transports.push_back(std::make_shared<UDPv4TransportDescriptor>());
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
break;
}
default:
{
pqos.transport().use_builtin_transports = true;
break;
}
}

auto factory = DomainParticipantFactory::get_instance();
Expand Down Expand Up @@ -138,18 +150,13 @@ PublisherApp::PublisherApp(
writer_qos.history().depth = max_samples;
writer_qos.resource_limits().max_samples_per_instance = max_samples;
writer_qos.resource_limits().max_samples = writer_qos.resource_limits().max_instances * max_samples;
switch (config.delivery_mechanism)
if (CLIParser::DeliveryMechanismKind::DATA_SHARING == config.delivery_mechanism)
{
case CLIParser::DeliveryMechanismKind::DATA_SHARING:
writer_qos.data_sharing().automatic();
break;
case CLIParser::DeliveryMechanismKind::SHM:
case CLIParser::DeliveryMechanismKind::TCP:
case CLIParser::DeliveryMechanismKind::UDP:
case CLIParser::DeliveryMechanismKind::INTRA_PROCESS:
default:
writer_qos.data_sharing().off();
break;
writer_qos.data_sharing().automatic();
}
else
{
writer_qos.data_sharing().off();
}
writer_ = publisher_->create_datawriter(topic_, writer_qos, this, StatusMask::all());
if (writer_ == nullptr)
Expand Down
2 changes: 1 addition & 1 deletion examples/cpp/delivery_mechanisms/PublisherApp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PublisherApp : public Application, public DataWriterListener
public:

PublisherApp(
const CLIParser::entity_config& config,
const CLIParser::delivery_mechanisms_config& config,
const std::string& topic_name);

~PublisherApp();
Expand Down
Loading

0 comments on commit b039789

Please sign in to comment.