diff --git a/.gitignore b/.gitignore
index 93d1923..57e99d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -85,6 +85,7 @@ install_manifest.txt
compile_commands.json
CTestTestfile.cmake
build
+install
### Eclipse ###
@@ -139,10 +140,10 @@ local.properties
.worksheet
### Eclipse Patch ###
-# Eclipse Core
+# Eclipse Core
.project
-# JDT-specific (Eclipse Java Development Tools)
+# JDT-specific (Eclipse Java Development Tools)
.classpath
### Linux ###
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8dd1ab9..edecb11 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -27,11 +27,11 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif()
-project(microxrcedds VERSION "2.0.0")
+project(microxrcedds VERSION "2.1.0")
-set(_client_tag master)
-set(_client_version 2.0.0)
-set(_agent_tag master)
+set(_client_tag v2.1.0)
+set(_client_version 2.1.0)
+set(_agent_tag v2.1.0)
set(_gen_tag master)
###############################################################################
@@ -40,6 +40,7 @@ set(_gen_tag master)
option(UXRCE_BUILD_EXAMPLES "Build and install examples." OFF)
option(UXRCE_ENABLE_CLIENT "Enable the building and installation of Micro XRCE-DDS Client." ON)
option(UXRCE_ENABLE_AGENT "Enable the building and installation of Micro XRCE-DDS Agent." ON)
+option(UXRCE_BUILD_AGENT_EXECUTABLE "Enable the buildin Micro XRCE-DDS Agent executable." OFF)
option(UXRCE_ENABLE_GEN "Enable the building and installation of Micro XRCE-DDS Gen." OFF)
option(UXRCE_BUILD_TESTS "Build tests." OFF)
option(UXRCE_BUILD_PROFILING "Build profiling test executables.")
@@ -117,6 +118,7 @@ if(UXRCE_ENABLE_CLIENT)
-DUCLIENT_BUILD_TESTS:BOOL=${UXRCE_BUILD_TESTS}
-DUCLIENT_ISOLATED_INSTALL:BOOL=OFF
-DGTEST_INDIVIDUAL:BOOL=ON
+ -DUCLIENT_PROFILE_CAN:BOOL=ON
)
list(APPEND _deps client)
endif()
@@ -153,7 +155,7 @@ if(UXRCE_ENABLE_AGENT)
-DUAGENT_BUILD_TESTS:BOOL=${UXRCE_BUILD_TESTS}
-DUAGENT_P2P_CLIENT_TAG:STRING=${_client_tag}
-DUAGENT_P2P_CLIENT_VERSION:STRING=${_client_version}
- -DUAGENT_BUILD_EXECUTABLE:BOOL=OFF
+ -DUAGENT_BUILD_EXECUTABLE:BOOL=${UXRCE_BUILD_AGENT_EXECUTABLE}
-DUAGENT_ISOLATED_INSTALL:BOOL=OFF
-DGTEST_INDIVIDUAL:BOOL=ON
DEPENDS
diff --git a/Dockerfile b/Dockerfile
index b8bc25b..8d283e7 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -33,6 +33,7 @@ RUN cd /uxrce/build && \
cmake \
-DCMAKE_INSTALL_PREFIX=../install \
-DUXRCE_BUILD_EXAMPLES=ON \
+ -DUXRCE_BUILD_AGENT_EXECUTABLE=ON \
.. &&\
make -j $(nproc) && make install
diff --git a/README.md b/README.md
index 4ca6d8f..adca47c 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@
*eProsima Micro XRCE-DDS* is a library implementing the [DDS-XRCE protocol](https://www.omg.org/spec/DDS-XRCE/About-DDS-XRCE/) as defined and maintained by the OMG, whose aim is to allow resource constrained devices such as microcontrollers to communicate with the [DDS](https://www.omg.org/spec/DDS/About-DDS/>) world as any other DDS actor would do.
It follows a client/server paradigm and is composed by two libraries, the *Micro XRCE-DDS Client* and the *Micro XRCE-DDS Agent*. The *Micro XRCE-DDS Clients* are lightweight entities meant to be compiled on e**X**tremely **R**esource **C**onstrained **E**nvironments, while the *Micro XRCE-DDS Agent* is a broker which bridges the *Clients* with the DDS world.
-
+
The *Micro XRCE-DDS Clients* request operations to the *Agent* to publish and/or subscribe to topics in the DDS global dataspace. Remote procedure calls, as defined by the [DDS-RPC standard](https://www.omg.org/spec/DDS-RPC/About-DDS-RPC/), are also supported, allowing *Clients* to communicate in the DDS dataspace according to a request/reply paradigm.
The *Agents* process these requests and send back a response with the operation status result and with the requested data, in the case of subscribe/reply operations.
@@ -25,7 +25,7 @@ This is made possible by the creation of *DDS Entities* on the *Agent* as a resu
The communication between a *Micro XRCE-DDS Client* and a *Micro XRCE-DDS Agent* is achieved by means of several kinds of built-in transports: **UDPv4**, **UDPv6**, **TCPv4**, **TCPv6** and **Serial** communication. In addition, there is the possibility for the user to generate its own **Custom** transport.
-
+
This repository contains the totality of the *eProsima Micro XRCE-DDS* products:
diff --git a/test/test/client_agent/CMakeLists.txt b/test/test/client_agent/CMakeLists.txt
index a3e7f9b..00a327d 100644
--- a/test/test/client_agent/CMakeLists.txt
+++ b/test/test/client_agent/CMakeLists.txt
@@ -19,11 +19,18 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/DEFAULT_FASTRTPS_PROFILES.xml.in
@ONLY
)
-add_executable(client-agent-test ClientAgentInteraction.cpp)
+set(SRCS ClientAgentInteraction.cpp)
+
+if(NOT WIN32)
+ list(APPEND SRCS ClientAgentSerial.cpp)
+ list(APPEND SRCS ClientAgentCan.cpp)
+endif()
+
+add_executable(client-agent-test ${SRCS})
add_gtest(client-agent-test
SOURCES
- ClientAgentInteraction.cpp
+ ${SRCS}
ENVIRONMENTS
$<$:LD_LIBRARY_PATH=${CMAKE_PREFIX_PATH}/lib>
$<$:PATH=${CMAKE_PREFIX_PATH}/bin>
diff --git a/test/test/client_agent/ClientAgentCan.cpp b/test/test/client_agent/ClientAgentCan.cpp
new file mode 100644
index 0000000..14ca84b
--- /dev/null
+++ b/test/test/client_agent/ClientAgentCan.cpp
@@ -0,0 +1,15 @@
+#include
+#include
+#include
+
+#include "ClientAgentCan.hpp"
+
+TEST_P(ClientAgentCan, PingFromClientToAgent)
+{
+ ASSERT_NO_FATAL_FAILURE(client_can_.ping_agent());
+}
+
+INSTANTIATE_TEST_CASE_P(
+ CanTransports,
+ ClientAgentCan,
+ ::testing::Values(MiddlewareKind::FASTDDS));
\ No newline at end of file
diff --git a/test/test/client_agent/ClientAgentCan.hpp b/test/test/client_agent/ClientAgentCan.hpp
new file mode 100644
index 0000000..be483de
--- /dev/null
+++ b/test/test/client_agent/ClientAgentCan.hpp
@@ -0,0 +1,98 @@
+#ifndef IN_TEST_CLIENTCAN_INT_HPP
+#define IN_TEST_CLIENTCAN_INT_HPP
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+class AgentCan
+{
+public:
+ const char * dev = "vcan0";
+ const uint32_t can_id = 0x00000001; // TODO: test different can_id
+
+ AgentCan(MiddlewareKind middleware)
+ : middleware_{}
+ {
+ switch (middleware)
+ {
+ case MiddlewareKind::FASTDDS:
+ middleware_ = eprosima::uxr::Middleware::Kind::FASTDDS;
+ break;
+ case MiddlewareKind::FASTRTPS:
+ middleware_ = eprosima::uxr::Middleware::Kind::FASTRTPS;
+ break;
+ case MiddlewareKind::CED:
+ middleware_ = eprosima::uxr::Middleware::Kind::CED;
+ break;
+ }
+ }
+
+ ~AgentCan()
+ {}
+
+ bool is_interface_up(const char * interface)
+ {
+ struct ifreq ifr;
+ int sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
+ memset(&ifr, 0, sizeof(ifr));
+ strcpy(ifr.ifr_name, interface);
+ ioctl(sock, SIOCGIFFLAGS, &ifr);
+ close(sock);
+ return !!(ifr.ifr_flags & IFF_UP);
+ }
+
+ void start()
+ {
+ if (!is_interface_up(dev))
+ {
+ ASSERT_TRUE(0 == system("ip link add dev vcan0 type vcan && ip link set vcan0 mtu 72 && ip link set dev vcan0 up"));
+ }
+
+ agent_can_.reset(new eprosima::uxr::CanAgent(dev, can_id, middleware_));
+ agent_can_->set_verbose_level(6);
+ ASSERT_TRUE(agent_can_->start());
+ }
+
+ void stop()
+ {
+ ASSERT_TRUE(agent_can_->stop());
+ }
+
+private:
+ std::unique_ptr agent_can_;
+ eprosima::uxr::Middleware::Kind middleware_;
+};
+
+class ClientAgentCan : public ::testing::TestWithParam
+{
+public:
+ ClientAgentCan()
+ : client_can_(0.0f, 8)
+ , agent_(GetParam())
+ {}
+
+ ~ClientAgentCan()
+ {}
+
+ void SetUp() override
+ {
+ agent_.start();
+ ASSERT_NO_FATAL_FAILURE(client_can_.init_transport(agent_.dev, agent_.can_id + 1));
+ }
+
+ void TearDown() override
+ {
+ ASSERT_NO_FATAL_FAILURE(client_can_.close_transport());
+ agent_.stop();
+ }
+
+protected:
+ ClientCan client_can_;
+ AgentCan agent_;
+};
+
+#endif // ifndef IN_TEST_CLIENTCAN_INT_HPP
\ No newline at end of file
diff --git a/test/test/client_agent/ClientAgentInteraction.cpp b/test/test/client_agent/ClientAgentInteraction.cpp
index 7a1d17e..e2ebd12 100644
--- a/test/test/client_agent/ClientAgentInteraction.cpp
+++ b/test/test/client_agent/ClientAgentInteraction.cpp
@@ -1,215 +1,12 @@
#include
-#ifdef _WIN32
-#include
-#include
-#include
-#include
-#else
-#include
-#include
-#include
-#include
-#endif
-
-#include
#include
#include
-class ClientAgentInteraction : public ::testing::TestWithParam>
-{
-public:
- const uint16_t AGENT_PORT = 2018 + uint16_t(std::get<0>(GetParam()));
- const float LOST = 0.1f;
- static const uint8_t INIT_CLOSE_RETRIES = 20;
-
- ClientAgentInteraction()
- : transport_(std::get<0>(GetParam()))
- , middleware_{}
- , client_(0.0f, 8)
- {
- switch (std::get<1>(GetParam()))
- {
- case MiddlewareKind::FASTDDS:
- middleware_ = eprosima::uxr::Middleware::Kind::FASTDDS;
- break;
- case MiddlewareKind::FASTRTPS:
- middleware_ = eprosima::uxr::Middleware::Kind::FASTRTPS;
- break;
- case MiddlewareKind::CED:
- middleware_ = eprosima::uxr::Middleware::Kind::CED;
- break;
- }
- }
-
- ~ClientAgentInteraction()
- {}
+#include "ClientAgentInteraction.hpp"
- void SetUp() override
- {
- start_agent(AGENT_PORT);
- switch (transport_)
- {
- case Transport::UDP_IPV4_TRANSPORT:
- case Transport::TCP_IPV4_TRANSPORT:
- {
- ASSERT_NO_FATAL_FAILURE(client_.init_transport(transport_, "127.0.0.1", std::to_string(AGENT_PORT).c_str()));
- break;
- }
- case Transport::UDP_IPV6_TRANSPORT:
- case Transport::TCP_IPV6_TRANSPORT:
- {
- ASSERT_NO_FATAL_FAILURE(client_.init_transport(transport_, "::1", std::to_string(AGENT_PORT).c_str()));
- break;
- }
- case Transport::CUSTOM_WITHOUT_FRAMING:
- case Transport::CUSTOM_WITH_FRAMING:
- {
- ASSERT_NO_FATAL_FAILURE(client_.init_transport(transport_, NULL, NULL));
- break;
- }
- }
- }
-
- void TearDown() override
- {
- ASSERT_NO_FATAL_FAILURE(client_.close_transport(transport_));
- stop_agent(AGENT_PORT);
- }
-
- // TODO (#4334 - @jamoralp): Add serial tests.
- void start_agent(uint16_t port)
- {
- switch(transport_)
- {
- case Transport::UDP_IPV4_TRANSPORT:
- {
- agent_udp4_.reset(new eprosima::uxr::UDPv4Agent(port, middleware_));
- agent_udp4_->set_verbose_level(6);
- ASSERT_TRUE(agent_udp4_->start());
- break;
- }
- case Transport::UDP_IPV6_TRANSPORT:
- {
- agent_udp6_.reset(new eprosima::uxr::UDPv6Agent(port, middleware_));
- agent_udp6_->set_verbose_level(6);
- ASSERT_TRUE(agent_udp6_->start());
- break;
- }
- case Transport::TCP_IPV4_TRANSPORT:
- {
- agent_tcp4_.reset(new eprosima::uxr::TCPv4Agent(port, middleware_));
- agent_tcp4_->set_verbose_level(6);
- ASSERT_TRUE(agent_tcp4_->start());
- break;
- }
- case Transport::TCP_IPV6_TRANSPORT:
- {
- agent_tcp6_.reset(new eprosima::uxr::TCPv6Agent(port, middleware_));
- agent_tcp6_->set_verbose_level(6);
- ASSERT_TRUE(agent_tcp6_->start());
- break;
- }
- case Transport::CUSTOM_WITHOUT_FRAMING:
- {
- try
- {
- agent_custom_endpoint_.add_member("index");
- }
- catch(const std::exception& /*e*/)
- {
- // Do nothing
- }
-
-
- agent_custom_.reset(new eprosima::uxr::CustomAgent(
- "custom_agent",
- &agent_custom_endpoint_,
- middleware_,
- false,
- agent_custom_transport_open,
- agent_custom_transport_close,
- agent_custom_transport_write_packet,
- agent_custom_transport_read_packet));
- agent_custom_->set_verbose_level(6);
- ASSERT_TRUE(agent_custom_->start());
- break;
- }
- case Transport::CUSTOM_WITH_FRAMING:
- {
- try
- {
- agent_custom_endpoint_.add_member("index");
- }
- catch(const std::exception& /*e*/)
- {
- // Do nothing
- }
-
- agent_custom_.reset(new eprosima::uxr::CustomAgent(
- "custom_agent",
- &agent_custom_endpoint_,
- middleware_,
- true,
- agent_custom_transport_open,
- agent_custom_transport_close,
- agent_custom_transport_write_stream,
- agent_custom_transport_read_stream));
- agent_custom_->set_verbose_level(6);
- ASSERT_TRUE(agent_custom_->start());
- break;
- }
-
- }
- }
-
- void stop_agent(uint16_t port)
- {
- switch(transport_)
- {
- case Transport::UDP_IPV4_TRANSPORT:
- {
- ASSERT_TRUE(agent_udp4_->stop());
- break;
- }
- case Transport::UDP_IPV6_TRANSPORT:
- {
- ASSERT_TRUE(agent_udp6_->stop());
- break;
- }
- case Transport::TCP_IPV4_TRANSPORT:
- {
- ASSERT_TRUE(agent_tcp4_->stop());
- break;
- }
- case Transport::TCP_IPV6_TRANSPORT:
- {
- ASSERT_TRUE(agent_tcp6_->stop());
- break;
- }
- case Transport::CUSTOM_WITHOUT_FRAMING:
- case Transport::CUSTOM_WITH_FRAMING:
- {
- ASSERT_TRUE(agent_custom_->stop());
- break;
- }
- }
- }
-
-protected:
- Transport transport_;
- std::unique_ptr agent_udp4_;
- std::unique_ptr agent_udp6_;
- std::unique_ptr agent_tcp4_;
- std::unique_ptr agent_tcp6_;
- std::unique_ptr agent_custom_;
- eprosima::uxr::CustomEndPoint agent_custom_endpoint_;
-
- eprosima::uxr::Middleware::Kind middleware_;
- Client client_;
-};
TEST_P(ClientAgentInteraction, InitCloseSession)
{
@@ -264,6 +61,50 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationXMLReliable)
}
}
+TEST_P(ClientAgentInteraction, NewEntitiesCreationBINBestEffort)
+{
+ switch (std::get<1>(GetParam()))
+ {
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ // Not implemented
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
+ }
+}
+
+TEST_P(ClientAgentInteraction, NewEntitiesCreationBINReliable)
+{
+ switch (std::get<1>(GetParam()))
+ {
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ // Not implemented
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ }
+}
+
TEST_P(ClientAgentInteraction, NewEntitiesCreationREFBestEffort)
{
switch (std::get<1>(GetParam()))
@@ -333,6 +174,30 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseXMLXMLReliable)
}
}
+TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseBINBINReliable)
+{
+ switch (std::get<1>(GetParam()))
+ {
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ // Not implemented
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
+ }
+}
+
/* TODO (#3589): Fix XML and REF reference issue to enable this test.
TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseXMLREFReliable)
{
@@ -366,7 +231,7 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseREFREFReliable)
}
}
-TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReliable)
+TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceXMLXMLReliable)
{
switch (std::get<1>(GetParam()))
{
@@ -391,7 +256,31 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReliable)
}
}
-TEST_P(ClientAgentInteraction, ExistantEntitiesCreationNoReplaceReliable)
+TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceBINBINReliable)
+{
+ switch (std::get<1>(GetParam()))
+ {
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ // Not implemented
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
+ break;
+ }
+ }
+}
+
+TEST_P(ClientAgentInteraction, ExistantEntitiesCreationNoReplaceXMLXMLReliable)
{
switch (std::get<1>(GetParam()))
{
@@ -416,7 +305,31 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationNoReplaceReliable)
}
}
-TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReuseReliable)
+TEST_P(ClientAgentInteraction, ExistantEntitiesCreationNoReplaceBINBINReliable)
+{
+ switch (std::get<1>(GetParam()))
+ {
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ // Not implemented
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
+ break;
+ }
+ }
+}
+
+TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReuseXMLXMLReliable)
{
switch (std::get<1>(GetParam()))
{
@@ -441,6 +354,30 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReuseReliable)
}
}
+TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReuseBINBINReliable)
+{
+ switch (std::get<1>(GetParam()))
+ {
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ // Not implemented
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_bin(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
+ break;
+ }
+ }
+}
+
TEST_P(ClientAgentInteraction, PingFromClientToAgent)
{
const Transport transport_kind(std::get<0>(GetParam()));
diff --git a/test/test/client_agent/ClientAgentInteraction.hpp b/test/test/client_agent/ClientAgentInteraction.hpp
new file mode 100644
index 0000000..9b241c0
--- /dev/null
+++ b/test/test/client_agent/ClientAgentInteraction.hpp
@@ -0,0 +1,232 @@
+#ifndef IN_TEST_CLIENT_INT_HPP
+#define IN_TEST_CLIENT_INT_HPP
+
+#ifdef _WIN32
+#include
+#include
+#include
+#include
+#else
+#include
+#include
+#include
+#include
+#endif
+
+#include
+
+class Agent
+{
+public:
+ Agent(Transport transport,
+ MiddlewareKind middleware,
+ const uint16_t port)
+ : transport_(transport)
+ , middleware_{}
+ , port_(port)
+ {
+ switch (middleware)
+ {
+ case MiddlewareKind::FASTDDS:
+ middleware_ = eprosima::uxr::Middleware::Kind::FASTDDS;
+ break;
+ case MiddlewareKind::FASTRTPS:
+ middleware_ = eprosima::uxr::Middleware::Kind::FASTRTPS;
+ break;
+ case MiddlewareKind::CED:
+ middleware_ = eprosima::uxr::Middleware::Kind::CED;
+ break;
+ }
+ }
+
+ ~Agent()
+ {}
+
+ void start()
+ {
+ switch(transport_)
+ {
+ case Transport::UDP_IPV4_TRANSPORT:
+ {
+ agent_udp4_.reset(new eprosima::uxr::UDPv4Agent(port_, middleware_));
+ agent_udp4_->set_verbose_level(6);
+ ASSERT_TRUE(agent_udp4_->start());
+ break;
+ }
+ case Transport::UDP_IPV6_TRANSPORT:
+ {
+ agent_udp6_.reset(new eprosima::uxr::UDPv6Agent(port_, middleware_));
+ agent_udp6_->set_verbose_level(6);
+ ASSERT_TRUE(agent_udp6_->start());
+ break;
+ }
+ case Transport::TCP_IPV4_TRANSPORT:
+ {
+ agent_tcp4_.reset(new eprosima::uxr::TCPv4Agent(port_, middleware_));
+ agent_tcp4_->set_verbose_level(6);
+ ASSERT_TRUE(agent_tcp4_->start());
+ break;
+ }
+ case Transport::TCP_IPV6_TRANSPORT:
+ {
+ agent_tcp6_.reset(new eprosima::uxr::TCPv6Agent(port_, middleware_));
+ agent_tcp6_->set_verbose_level(6);
+ ASSERT_TRUE(agent_tcp6_->start());
+ break;
+ }
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ {
+ try
+ {
+ agent_custom_endpoint_.add_member("index");
+ }
+ catch(const std::exception& /*e*/)
+ {
+ // Do nothing
+ }
+
+
+ agent_custom_.reset(new eprosima::uxr::CustomAgent(
+ "custom_agent",
+ &agent_custom_endpoint_,
+ middleware_,
+ false,
+ agent_custom_transport_open,
+ agent_custom_transport_close,
+ agent_custom_transport_write_packet,
+ agent_custom_transport_read_packet));
+ agent_custom_->set_verbose_level(6);
+ ASSERT_TRUE(agent_custom_->start());
+ break;
+ }
+ case Transport::CUSTOM_WITH_FRAMING:
+ {
+ try
+ {
+ agent_custom_endpoint_.add_member("index");
+ }
+ catch(const std::exception& /*e*/)
+ {
+ // Do nothing
+ }
+
+ agent_custom_.reset(new eprosima::uxr::CustomAgent(
+ "custom_agent",
+ &agent_custom_endpoint_,
+ middleware_,
+ true,
+ agent_custom_transport_open,
+ agent_custom_transport_close,
+ agent_custom_transport_write_stream,
+ agent_custom_transport_read_stream));
+ agent_custom_->set_verbose_level(6);
+ ASSERT_TRUE(agent_custom_->start());
+ break;
+ }
+
+ }
+ }
+
+ void stop()
+ {
+ switch(transport_)
+ {
+ case Transport::UDP_IPV4_TRANSPORT:
+ {
+ ASSERT_TRUE(agent_udp4_->stop());
+ break;
+ }
+ case Transport::UDP_IPV6_TRANSPORT:
+ {
+ ASSERT_TRUE(agent_udp6_->stop());
+ break;
+ }
+ case Transport::TCP_IPV4_TRANSPORT:
+ {
+ ASSERT_TRUE(agent_tcp4_->stop());
+ break;
+ }
+ case Transport::TCP_IPV6_TRANSPORT:
+ {
+ ASSERT_TRUE(agent_tcp6_->stop());
+ break;
+ }
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ case Transport::CUSTOM_WITH_FRAMING:
+ {
+ ASSERT_TRUE(agent_custom_->stop());
+ break;
+ }
+ }
+ }
+
+private:
+ Transport transport_;
+ std::unique_ptr agent_udp4_;
+ std::unique_ptr agent_udp6_;
+ std::unique_ptr agent_tcp4_;
+ std::unique_ptr agent_tcp6_;
+ std::unique_ptr agent_custom_;
+ eprosima::uxr::CustomEndPoint agent_custom_endpoint_;
+
+ eprosima::uxr::Middleware::Kind middleware_;
+ uint16_t port_;
+};
+
+class ClientAgentInteraction : public ::testing::TestWithParam>
+{
+public:
+ const uint16_t AGENT_PORT = 2018 + uint16_t(std::get<0>(GetParam()));
+ const float LOST = 0.1f;
+ static const uint8_t INIT_CLOSE_RETRIES = 20;
+
+ ClientAgentInteraction()
+ : transport_(std::get<0>(GetParam()))
+ , client_(0.0f, 8)
+ , agent_(transport_, (MiddlewareKind) std::get<1>(GetParam()), AGENT_PORT)
+ {}
+
+ ~ClientAgentInteraction()
+ {
+ }
+
+ void SetUp() override
+ {
+ agent_.start();
+ switch (transport_)
+ {
+ case Transport::UDP_IPV4_TRANSPORT:
+ case Transport::TCP_IPV4_TRANSPORT:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.init_transport(transport_, "127.0.0.1", std::to_string(AGENT_PORT).c_str()));
+ break;
+ }
+ case Transport::UDP_IPV6_TRANSPORT:
+ case Transport::TCP_IPV6_TRANSPORT:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.init_transport(transport_, "::1", std::to_string(AGENT_PORT).c_str()));
+ break;
+ }
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ case Transport::CUSTOM_WITH_FRAMING:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.init_transport(transport_, NULL, NULL));
+ break;
+ }
+ }
+ }
+
+ void TearDown() override
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.close_transport(transport_));
+ agent_.stop();
+ }
+
+
+protected:
+ Transport transport_;
+ Client client_;
+ Agent agent_;
+};
+
+#endif // ifndef IN_TEST_CLIENT_INT_HPP
\ No newline at end of file
diff --git a/test/test/client_agent/ClientAgentSerial.cpp b/test/test/client_agent/ClientAgentSerial.cpp
new file mode 100644
index 0000000..b277dd7
--- /dev/null
+++ b/test/test/client_agent/ClientAgentSerial.cpp
@@ -0,0 +1,47 @@
+#include
+#include
+#include
+
+#include "ClientAgentSerial.hpp"
+
+TEST_P(ClientAgentSerial, PingFromClientToAgent)
+{
+ const Transport transport_kind(std::get<0>(GetParam()));
+
+ switch (transport_kind)
+ {
+ case Transport::SERIAL_TRANSPORT:
+ ASSERT_NO_FATAL_FAILURE(client_serial_.ping_agent(transport_kind));
+ break;
+
+ case Transport::MULTISERIAL_TRANSPORT:
+ {
+ std::vector ping_thr;
+ for (auto & element : clients_multiserial_)
+ {
+ std::thread ping_thread(&ClientSerial::ping_agent, &element, transport_kind);
+ ping_thr.push_back(std::move(ping_thread));
+ }
+
+ for (auto & thr : ping_thr)
+ {
+ if (thr.joinable())
+ {
+ thr.join();
+ }
+ }
+
+ break;
+ }
+
+ default:
+ break;
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(
+ SerialTransports,
+ ClientAgentSerial,
+ ::testing::Combine(
+ ::testing::Values(Transport::SERIAL_TRANSPORT, Transport::MULTISERIAL_TRANSPORT),
+ ::testing::Values(MiddlewareKind::FASTDDS, MiddlewareKind::FASTRTPS, MiddlewareKind::CED)));
\ No newline at end of file
diff --git a/test/test/client_agent/ClientAgentSerial.hpp b/test/test/client_agent/ClientAgentSerial.hpp
new file mode 100644
index 0000000..4443032
--- /dev/null
+++ b/test/test/client_agent/ClientAgentSerial.hpp
@@ -0,0 +1,193 @@
+#ifndef IN_TEST_CLIENTSERIAL_INT_HPP
+#define IN_TEST_CLIENTSERIAL_INT_HPP
+
+#include
+#include
+#include
+#include
+
+class AgentSerial
+{
+public:
+ const char * baudrate = "115200";
+ const char * port_name = "/dev/ptmx";
+ const int client_number = 5;
+
+ AgentSerial(Transport transport,
+ MiddlewareKind middleware)
+ : transport_(transport)
+ , middleware_{}
+ {
+ switch (middleware)
+ {
+ case MiddlewareKind::FASTDDS:
+ middleware_ = eprosima::uxr::Middleware::Kind::FASTDDS;
+ break;
+ case MiddlewareKind::FASTRTPS:
+ middleware_ = eprosima::uxr::Middleware::Kind::FASTRTPS;
+ break;
+ case MiddlewareKind::CED:
+ middleware_ = eprosima::uxr::Middleware::Kind::CED;
+ break;
+ }
+ }
+
+ ~AgentSerial()
+ {}
+
+ void start()
+ {
+ switch(transport_)
+ {
+ case Transport::SERIAL_TRANSPORT:
+ {
+ struct termios attr = ClientSerial::init_termios(baudrate);
+ agent_serial_.reset(new eprosima::uxr::TermiosAgent(port_name, O_RDWR | O_NOCTTY, attr, 0, middleware_));
+ agent_serial_->set_verbose_level(6);
+ ASSERT_TRUE(agent_serial_->start());
+ break;
+ }
+ case Transport::MULTISERIAL_TRANSPORT:
+ {
+ struct termios attr = ClientSerial::init_termios(baudrate);
+
+ std::vector devs;
+ for (size_t i = 0; i < client_number; i++)
+ {
+ devs.push_back(port_name);
+ }
+
+ agent_multiserial_.reset(new eprosima::uxr::MultiTermiosAgent(devs, O_RDWR | O_NOCTTY, attr, 0, middleware_));
+ ASSERT_TRUE(agent_multiserial_->start());
+ break;
+ }
+ }
+ }
+
+ void stop()
+ {
+ switch(transport_)
+ {
+ case Transport::SERIAL_TRANSPORT:
+ {
+ ASSERT_TRUE(agent_serial_->stop());
+ break;
+ }
+ case Transport::MULTISERIAL_TRANSPORT:
+ {
+ ASSERT_TRUE(agent_multiserial_->stop());
+ break;
+ }
+ }
+ }
+
+ int getfd()
+ {
+ return agent_serial_->getfd();
+ }
+
+ std::vector getfd_multi()
+ {
+ return agent_multiserial_->getfds();
+ }
+
+ bool wait_multiserial_open()
+ {
+ while (getfd_multi().size() != client_number)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+
+ return true;
+ }
+
+private:
+ Transport transport_;
+ std::unique_ptr agent_serial_;
+ std::unique_ptr agent_multiserial_;
+
+ eprosima::uxr::Middleware::Kind middleware_;
+};
+
+class ClientAgentSerial : public ::testing::TestWithParam>
+{
+public:
+
+ ClientAgentSerial()
+ : client_serial_(0.0f, 8)
+ , transport_(std::get<0>(GetParam()))
+ , clients_multiserial_{}
+ , agent_(transport_, (MiddlewareKind) std::get<1>(GetParam()))
+ {
+ for (size_t i = 0; i < agent_.client_number; i++)
+ {
+ ClientSerial auxcli(0.0f, 8);
+ clients_multiserial_.push_back(auxcli);
+ }
+ }
+
+ ~ClientAgentSerial()
+ {}
+
+ void SetUp() override
+ {
+ agent_.start();
+
+ switch (transport_)
+ {
+ case Transport::SERIAL_TRANSPORT:
+ {
+ int masterfd = agent_.getfd();
+ grantpt(masterfd);
+ unlockpt(masterfd);
+ ASSERT_NO_FATAL_FAILURE(client_serial_.init_transport(transport_, ptsname(masterfd), NULL));
+ break;
+ }
+ case Transport::MULTISERIAL_TRANSPORT:
+ {
+ agent_.wait_multiserial_open();
+ std::vector masterfd = agent_.getfd_multi();
+
+ for (size_t i = 0; i < masterfd.size(); i++)
+ {
+ grantpt(masterfd[i]);
+ unlockpt(masterfd[i]);
+ ASSERT_NO_FATAL_FAILURE(clients_multiserial_[i].init_transport(transport_, ptsname(masterfd[i]), NULL));
+ }
+
+ break;
+ }
+ }
+ }
+
+ void TearDown() override
+ {
+ switch (transport_)
+ {
+ case Transport::SERIAL_TRANSPORT:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_serial_.close_transport(transport_));
+ break;
+ }
+ case Transport::MULTISERIAL_TRANSPORT:
+ {
+ for (size_t i = 0; i < clients_multiserial_.size(); i++)
+ {
+ ASSERT_NO_FATAL_FAILURE(clients_multiserial_[i].close_transport(transport_));
+ }
+
+ }
+ }
+
+ agent_.stop();
+ }
+
+protected:
+ Transport transport_;
+ AgentSerial agent_;
+
+ std::vector clients_multiserial_;
+ ClientSerial client_serial_;
+};
+
+#endif // ifndef IN_TEST_CLIENTSERIAL_INT_HPP
\ No newline at end of file
diff --git a/test/test/discovery/DiscoveryIntegration.cpp b/test/test/discovery/DiscoveryIntegration.cpp
index 92bd3be..f158836 100644
--- a/test/test/discovery/DiscoveryIntegration.cpp
+++ b/test/test/discovery/DiscoveryIntegration.cpp
@@ -151,12 +151,13 @@ class DiscoveryIntegration : public ::testing::TestWithParam> agents_tcp6_;
};
-TEST_P(DiscoveryIntegration, DiscoveryUnicast)
-{
- std::vector discovery_locators = init_scenario(4);
- std::this_thread::sleep_for(std::chrono::seconds(1));
- discovery_->unicast(discovery_locators);
-}
+// TODO(pablogs): Fix this test, randomly fails. Maybe a timeout?
+// TEST_P(DiscoveryIntegration, DiscoveryUnicast)
+// {
+// std::vector discovery_locators = init_scenario(4);
+// std::this_thread::sleep_for(std::chrono::seconds(1));
+// discovery_->unicast(discovery_locators);
+// }
TEST_P(DiscoveryIntegration, DiscoveryMulticast)
{
diff --git a/test/test/interaction_client/CMakeLists.txt b/test/test/interaction_client/CMakeLists.txt
index 1514a7f..c3c37fd 100644
--- a/test/test/interaction_client/CMakeLists.txt
+++ b/test/test/interaction_client/CMakeLists.txt
@@ -22,6 +22,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../common/EntitiesInfo.hpp.in
)
set(SRCS
+ Client.cpp
Gateway.cpp
BigHelloWorld.c
Discovery.cpp
diff --git a/test/test/interaction_client/Client.cpp b/test/test/interaction_client/Client.cpp
new file mode 100644
index 0000000..89360cf
--- /dev/null
+++ b/test/test/interaction_client/Client.cpp
@@ -0,0 +1,9 @@
+#include "Client.hpp"
+
+uint32_t Client::next_client_key_ = 0;
+
+bool flush_session(uxrSession* session, void * args){
+ (void) args;
+
+ return uxr_run_session_until_confirm_delivery(session, 1000);
+}
\ No newline at end of file
diff --git a/test/test/interaction_client/Client.hpp b/test/test/interaction_client/Client.hpp
index a167b9d..6ec9df5 100644
--- a/test/test/interaction_client/Client.hpp
+++ b/test/test/interaction_client/Client.hpp
@@ -6,6 +6,7 @@
#include
#include <../custom_transports/Custom_transports.hpp>
+#include
#include
#include
#include
@@ -20,10 +21,20 @@ enum class Transport
UDP_IPV6_TRANSPORT,
TCP_IPV4_TRANSPORT,
TCP_IPV6_TRANSPORT,
+ CAN_TRANSPORT,
+ SERIAL_TRANSPORT,
+ MULTISERIAL_TRANSPORT,
CUSTOM_WITH_FRAMING,
CUSTOM_WITHOUT_FRAMING
};
+enum class XRCECreationMode
+{
+ XRCE_XML_CREATION,
+ XRCE_BIN_CREATION,
+ XRCE_REF_CREATION
+};
+
inline bool operator == (const uxrObjectId& obj1, const uxrObjectId& obj2)
{
return obj1.id == obj2.id
@@ -38,9 +49,7 @@ inline bool operator == (const uxrStreamId& s1, const uxrStreamId& s2)
&& s1.direction == s2.direction;
}
-extern "C" bool flush_session(uxrSession* session){
- return uxr_run_session_until_confirm_delivery(session, 1000);
-}
+extern "C" bool flush_session(uxrSession* session, void * args);
class Client
{
@@ -188,6 +197,79 @@ class Client
ASSERT_EQ(request_id, last_status_request_id_);
}
+ template
+ void create_entities_bin(uint8_t id, uint8_t stream_id_raw, uint8_t expected_status, uint8_t flags)
+ {
+ uxrStreamId output_stream_id = uxr_stream_id_from_raw(stream_id_raw, UXR_OUTPUT_STREAM);
+ uint16_t request_id; uint8_t status;
+
+ uxrObjectId participant_id = uxr_object_id(id, UXR_PARTICIPANT_ID);
+ request_id =
+ uxr_buffer_create_participant_bin(
+ &session_, output_stream_id, participant_id, 0, "participant_name", flags);
+ ASSERT_NE(UXR_INVALID_REQUEST_ID, request_id);
+ uxr_run_session_until_all_status(&session_, timeout, &request_id, &status, 1);
+ ASSERT_EQ(expected_status, status);
+ ASSERT_EQ(expected_status, last_status_);
+ ASSERT_EQ(participant_id, last_status_object_id_);
+ ASSERT_EQ(request_id, last_status_request_id_);
+
+ uxrObjectId topic_id = uxr_object_id(id, UXR_TOPIC_ID);
+ request_id =
+ uxr_buffer_create_topic_bin(
+ &session_, output_stream_id, topic_id, participant_id, "topicname", "topictype", flags);
+ ASSERT_NE(UXR_INVALID_REQUEST_ID, request_id);
+ uxr_run_session_until_all_status(&session_, timeout, &request_id, &status, 1);
+ ASSERT_EQ(expected_status, status);
+ ASSERT_EQ(expected_status, last_status_);
+ ASSERT_EQ(topic_id, last_status_object_id_);
+ ASSERT_EQ(request_id, last_status_request_id_);
+
+ uxrObjectId publisher_id = uxr_object_id(id, UXR_PUBLISHER_ID);
+ request_id =
+ uxr_buffer_create_publisher_bin(
+ &session_, output_stream_id, publisher_id, participant_id, flags);
+ ASSERT_NE(UXR_INVALID_REQUEST_ID, request_id);
+ uxr_run_session_until_all_status(&session_, timeout, &request_id, &status, 1);
+ ASSERT_EQ(expected_status, status);
+ ASSERT_EQ(expected_status, last_status_);
+ ASSERT_EQ(publisher_id, last_status_object_id_);
+ ASSERT_EQ(request_id, last_status_request_id_);
+
+ uxrQoS_t qos = {UXR_DURABILITY_TRANSIENT_LOCAL, UXR_RELIABILITY_RELIABLE, UXR_HISTORY_KEEP_ALL, 0};
+
+ uxrObjectId datawriter_id = uxr_object_id(id, UXR_DATAWRITER_ID);
+ request_id =
+ uxr_buffer_create_datawriter_bin(&session_, output_stream_id, datawriter_id, publisher_id, topic_id, qos, flags);
+ ASSERT_NE(UXR_INVALID_REQUEST_ID, request_id);
+ uxr_run_session_until_all_status(&session_, timeout, &request_id, &status, 1);
+ ASSERT_EQ(expected_status, status);
+ ASSERT_EQ(expected_status, last_status_);
+ ASSERT_EQ(datawriter_id, last_status_object_id_);
+ ASSERT_EQ(request_id, last_status_request_id_);
+
+ uxrObjectId subscriber_id = uxr_object_id(id, UXR_SUBSCRIBER_ID);
+ request_id =
+ uxr_buffer_create_subscriber_bin(
+ &session_, output_stream_id, subscriber_id, participant_id, flags);
+ ASSERT_NE(UXR_INVALID_REQUEST_ID, request_id);
+ uxr_run_session_until_all_status(&session_, timeout, &request_id, &status, 1);
+ ASSERT_EQ(expected_status, status);
+ ASSERT_EQ(expected_status, last_status_);
+ ASSERT_EQ(subscriber_id, last_status_object_id_);
+ ASSERT_EQ(request_id, last_status_request_id_);
+
+ uxrObjectId datareader_id = uxr_object_id(id, UXR_DATAREADER_ID);
+ request_id =
+ uxr_buffer_create_datareader_bin(&session_, output_stream_id, datareader_id, subscriber_id, topic_id, qos, flags);
+ ASSERT_NE(UXR_INVALID_REQUEST_ID, request_id);
+ uxr_run_session_until_all_status(&session_, timeout, &request_id, &status, 1);
+ ASSERT_EQ(expected_status, status);
+ ASSERT_EQ(expected_status, last_status_);
+ ASSERT_EQ(datareader_id, last_status_object_id_);
+ ASSERT_EQ(request_id, last_status_request_id_);
+ }
+
void publish(uint8_t id, uint8_t stream_id_raw, size_t number, const std::string& message)
{
//Used only for waiting the RTPS subscriber matching
@@ -210,7 +292,7 @@ class Client
{
prepared = uxr_prepare_output_stream(&session_, output_stream_id, datawriter_id, &ub, topic_size);
} else {
- prepared = uxr_prepare_output_stream_fragmented(&session_, output_stream_id, datawriter_id, &ub, topic_size, flush_session);
+ prepared = uxr_prepare_output_stream_fragmented(&session_, output_stream_id, datawriter_id, &ub, topic_size, flush_session, NULL);
}
ASSERT_NE(prepared, UXR_INVALID_REQUEST_ID);
bool written = BigHelloWorld_serialize_topic(&ub, &topic);
@@ -218,7 +300,6 @@ class Client
ASSERT_FALSE(ub.error);
bool sent = uxr_run_session_until_confirm_delivery(&session_, timeout);
ASSERT_TRUE(sent);
- std::cout << "topic sent: " << i << std::endl;
}
}
@@ -254,9 +335,35 @@ class Client
ASSERT_EQ(UXR_STATUS_OK, last_status_);
ASSERT_EQ(datareader_id, last_status_object_id_);
ASSERT_EQ(request_id, last_status_request_id_);
+ std::cout << "msg " << expected_topic_index_ << " received." << std::endl;
}
}
+ void request_data(uint8_t id, uint8_t stream_id_raw, const std::string& message)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+ expected_message_ = message;
+ expected_topic_index_ = 0;
+ last_topic_stream_id_ = uxr_stream_id_from_raw(0, UXR_OUTPUT_STREAM);
+ last_topic_object_id_ = uxr_object_id(255, 15);
+
+ uxrStreamId output_stream_id = uxr_stream_id(0, UXR_RELIABLE_STREAM, UXR_OUTPUT_STREAM);
+ uxrStreamId input_stream_id = uxr_stream_id_from_raw(stream_id_raw, UXR_INPUT_STREAM);
+ uxrObjectId datareader_id = uxr_object_id(id, UXR_DATAREADER_ID);
+
+ uxrDeliveryControl delivery_control = {};
+ delivery_control.max_samples = UXR_MAX_SAMPLES_UNLIMITED;
+ uint16_t request_id = uxr_buffer_request_data(&session_, output_stream_id, datareader_id, input_stream_id, &delivery_control);
+ ASSERT_NE(UXR_INVALID_REQUEST_ID, request_id);
+
+ uxr_flash_output_streams(&session_);
+ }
+
+ size_t get_received_topics()
+ {
+ return expected_topic_index_;
+ }
+
void init_transport(Transport transport, const char* ip, const char* port)
{
switch(transport)
@@ -309,6 +416,9 @@ class Client
ASSERT_TRUE(uxr_init_custom_transport(&custom_transport_, NULL));
uxr_init_session(&session_, gateway_.monitorize(&custom_transport_.comm), client_key_);
break;
+ default:
+ FAIL() << "Transport type not supported";
+ break;
}
init_common();
@@ -341,6 +451,9 @@ class Client
case Transport::CUSTOM_WITH_FRAMING:
ASSERT_TRUE(uxr_close_custom_transport(&custom_transport_));
break;
+ default:
+ FAIL() << "Transport type not supported";
+ break;
}
}
@@ -374,15 +487,27 @@ class Client
comm = &custom_transport_.comm;
break;
}
+ default:
+ FAIL() << "Transport type not supported";
+ break;
}
ASSERT_TRUE(uxr_ping_agent_attempts(comm, 1000, 1));
}
-private:
+ void ping_agent_session()
+ {
+ ASSERT_TRUE(uxr_ping_agent_session(&session_, 1000, 1));
+ }
+
+protected:
void init_common()
{
- /* Setup callback. */
- uxr_set_topic_callback(&session_, on_topic_dispatcher, this);
+ if (session_.on_topic == NULL)
+ {
+ /* Setup callback. */
+ uxr_set_topic_callback(&session_, on_topic_dispatcher, this);
+ }
+
uxr_set_status_callback(&session_, on_status_dispatcher, this);
/* Create session. */
@@ -422,6 +547,11 @@ class Client
static_cast(args)->on_topic(session_, object_id, request_id, stream_id, serialization, length);
}
+ static void on_topic_multi_dispatcher(uxrSession* session_, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* serialization, uint16_t length, void* args)
+ {
+ static_cast(args)->on_topic_multi(session_, object_id, request_id, stream_id, serialization, length);
+ }
+
void on_topic(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* serialization, uint16_t length)
{
(void) session;
@@ -440,6 +570,21 @@ class Client
std::cout << "topic received: " << topic.index << std::endl;
}
+ void on_topic_multi(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* serialization, uint16_t length)
+ {
+ (void) session;
+ (void) length;
+
+ BigHelloWorld topic;
+ BigHelloWorld_deserialize_topic(serialization, &topic);
+
+ ASSERT_STREQ(expected_message_.c_str(), topic.message);
+ last_topic_object_id_ = object_id;
+ last_topic_stream_id_ = stream_id;
+ last_topic_request_id_ = request_id;
+ expected_topic_index_++;
+ }
+
static void on_status_dispatcher(uxrSession* session_, uxrObjectId object_id, uint16_t request_id, uint8_t status, void* args)
{
static_cast(args)->on_status(session_, object_id, request_id, status);
@@ -484,6 +629,4 @@ class Client
size_t expected_topic_index_;
};
-uint32_t Client::next_client_key_ = 0;
-
#endif //IN_TEST_CLIENT_HPP
diff --git a/test/test/interaction_client/ClientCan.hpp b/test/test/interaction_client/ClientCan.hpp
new file mode 100644
index 0000000..96ff053
--- /dev/null
+++ b/test/test/interaction_client/ClientCan.hpp
@@ -0,0 +1,57 @@
+
+#ifndef IN_TEST_CLIENTCAN_HPP
+#define IN_TEST_CLIENTCAN_HPP
+
+#include "Client.hpp"
+
+#include
+#include
+#include
+#include
+
+class ClientCan : public Client
+{
+public:
+ ClientCan(float lost, uint16_t history)
+ : Client(lost, history)
+ {
+ }
+
+ virtual ~ClientCan()
+ {}
+
+ void init_transport(const char* dev, const uint32_t can_id)
+ {
+ mtu_ = UXR_CONFIG_CAN_TRANSPORT_MTU;
+ ASSERT_TRUE(uxr_init_can_transport(&can_transport_, dev, can_id));
+ uxr_init_session(&session_, gateway_.monitorize(&can_transport_.comm), client_key_);
+ init_common();
+ }
+
+ void close_transport()
+ {
+ // Flash incomming messages.
+ uxr_run_session_time(&session_, 100);
+
+ bool deleted = uxr_delete_session(&session_);
+
+ if(0.0f == gateway_.get_lost_value()) //because the agent only send one status to a delete in stream 0.
+ {
+ EXPECT_TRUE(deleted);
+ EXPECT_EQ(UXR_STATUS_OK, session_.info.last_requested_status);
+ }
+
+ ASSERT_TRUE(uxr_close_can_transport(&can_transport_));
+ }
+
+ void ping_agent()
+ {
+ uxrCommunication* comm = &can_transport_.comm;
+ ASSERT_TRUE(uxr_ping_agent_attempts(comm, 1000, 1));
+ }
+
+private:
+ uxrCANTransport can_transport_;
+};
+
+#endif //IN_TEST_CLIENTCAN_HPP
diff --git a/test/test/interaction_client/ClientSerial.hpp b/test/test/interaction_client/ClientSerial.hpp
new file mode 100644
index 0000000..ec8d603
--- /dev/null
+++ b/test/test/interaction_client/ClientSerial.hpp
@@ -0,0 +1,112 @@
+
+#ifndef IN_TEST_CLIENTSERIAL_HPP
+#define IN_TEST_CLIENTSERIAL_HPP
+
+#include "Client.hpp"
+
+#include
+#include
+#include
+#include
+
+#include
+
+class ClientSerial : public Client
+{
+public:
+ ClientSerial(float lost, uint16_t history)
+ : Client(lost, history)
+ {
+ }
+
+ virtual ~ClientSerial()
+ {}
+
+ static termios init_termios(const char * baudrate_str)
+ {
+ struct termios attr = {};
+
+ /* Setting CONTROL OPTIONS. */
+ attr.c_cflag |= unsigned(CREAD); // Enable read.
+ attr.c_cflag |= unsigned(CLOCAL); // Set local mode.
+ attr.c_cflag &= unsigned(~PARENB); // Disable parity.
+ attr.c_cflag &= unsigned(~CSTOPB); // Set one stop bit.
+ attr.c_cflag &= unsigned(~CSIZE); // Mask the character size bits.
+ attr.c_cflag |= unsigned(CS8); // Set 8 data bits.
+ attr.c_cflag &= unsigned(~CRTSCTS); // Disable hardware flow control.
+
+ /* Setting LOCAL OPTIONS. */
+ attr.c_lflag &= unsigned(~ICANON); // Set non-canonical input.
+ attr.c_lflag &= unsigned(~ECHO); // Disable echoing of input characters.
+ attr.c_lflag &= unsigned(~ECHOE); // Disable echoing the erase character.
+ attr.c_lflag &= unsigned(~ISIG); // Disable SIGINTR, SIGSUSP, SIGDSUSP and SIGQUIT signals.
+
+ /* Setting INPUT OPTIONS. */
+ attr.c_iflag &= unsigned(~IXON); // Disable output software flow control.
+ attr.c_iflag &= unsigned(~IXOFF); // Disable input software flow control.
+ attr.c_iflag &= unsigned(~INPCK); // Disable parity check.
+ attr.c_iflag &= unsigned(~ISTRIP); // Disable strip parity bits.
+ attr.c_iflag &= unsigned(~IGNBRK); // No ignore break condition.
+ attr.c_iflag &= unsigned(~IGNCR); // No ignore carrier return.
+ attr.c_iflag &= unsigned(~INLCR); // No map NL to CR.
+ attr.c_iflag &= unsigned(~ICRNL); // No map CR to NL.
+
+ /* Setting OUTPUT OPTIONS. */
+ attr.c_oflag &= unsigned(~OPOST); // Set raw output.
+
+ /* Setting OUTPUT CHARACTERS. */
+ attr.c_cc[VMIN] = 10;
+ attr.c_cc[VTIME] = 1;
+
+ /* Setting baudrate. */
+ speed_t baudrate = getBaudRate(baudrate_str);
+ attr.c_ispeed = baudrate;
+ attr.c_ospeed = baudrate;
+
+ return attr;
+ }
+
+ void init_transport(Transport transport, const char* ip, const char* port)
+ {
+ (void) transport;
+ (void) port;
+ mtu_ = UXR_CONFIG_CUSTOM_TRANSPORT_MTU;
+ int fd_ = open(ip, O_RDWR | O_NONBLOCK);
+ ASSERT_TRUE(uxr_init_serial_transport(&serial_transport_, fd_, 0x00, 0x00));
+ uxr_init_session(&session_, gateway_.monitorize(&serial_transport_.comm), client_key_);
+
+ uxr_set_topic_callback(&session_, on_topic_multi_dispatcher, this);
+ init_common();
+ }
+
+ void close_transport(Transport transport)
+ {
+ (void) transport;
+
+ // Flash incomming messages.
+ uxr_run_session_time(&session_, 100);
+
+ bool deleted = uxr_delete_session(&session_);
+
+ if(0.0f == gateway_.get_lost_value()) //because the agent only send one status to a delete in stream 0.
+ {
+ EXPECT_TRUE(deleted);
+ EXPECT_EQ(UXR_STATUS_OK, session_.info.last_requested_status);
+ }
+
+ ASSERT_TRUE(uxr_close_serial_transport(&serial_transport_));
+ }
+
+ void ping_agent(
+ const Transport transport_kind)
+ {
+ (void) transport_kind;
+ uxrCommunication* comm = &serial_transport_.comm;
+ ASSERT_TRUE(uxr_ping_agent_attempts(comm, 1000, 1));
+ }
+
+private:
+ uxrSerialTransport serial_transport_;
+};
+
+#endif //IN_TEST_CLIENTSERIAL_HPP
diff --git a/test/test/publisher_subscriber/CMakeLists.txt b/test/test/publisher_subscriber/CMakeLists.txt
index 8a59635..317445c 100644
--- a/test/test/publisher_subscriber/CMakeLists.txt
+++ b/test/test/publisher_subscriber/CMakeLists.txt
@@ -19,11 +19,18 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/DEFAULT_FASTRTPS_PROFILES.xml.in
@ONLY
)
-add_executable(publisher-subscriber-interaction-test PublisherSubscriberInteraction.cpp)
+set(SRCS PublisherSubscriberInteraction.cpp)
+
+if(NOT WIN32)
+ list(APPEND SRCS PublisherSubscriberSerial.cpp)
+ list(APPEND SRCS PublisherSubscriberCan.cpp)
+endif()
+
+add_executable(publisher-subscriber-interaction-test ${SRCS})
add_gtest(publisher-subscriber-interaction-test
SOURCES
- PublisherSubscriberInteraction.cpp
+ ${SRCS}
ENVIRONMENTS
$<$:LD_LIBRARY_PATH=${CMAKE_PREFIX_PATH}/lib>
$<$:PATH=${CMAKE_PREFIX_PATH}/bin>
diff --git a/test/test/publisher_subscriber/PublisherSubscriberCan.cpp b/test/test/publisher_subscriber/PublisherSubscriberCan.cpp
new file mode 100644
index 0000000..c1754ee
--- /dev/null
+++ b/test/test/publisher_subscriber/PublisherSubscriberCan.cpp
@@ -0,0 +1,146 @@
+#include
+#include
+
+#include
+#include "../client_agent/ClientAgentCan.hpp"
+
+class PubSubCan : public ClientCan
+{
+public:
+ PubSubCan(std::tuple parameters,
+ uint8_t id,
+ const char *dev,
+ const uint32_t can_id)
+ : ClientCan(std::get<1>(parameters), 16)
+ , middleware_(std::get<0>(parameters))
+ , creation_mode_(std::get<2>(parameters))
+ , id_(id)
+ , dev_(dev)
+ , can_id_(can_id)
+ {}
+
+ ~PubSubCan()
+ {}
+
+ void init()
+ {
+ ASSERT_NO_FATAL_FAILURE(ClientCan::init_transport(dev_, can_id_));
+
+ if (creation_mode_ == XRCECreationMode::XRCE_XML_CREATION)
+ {
+ switch (middleware_)
+ {
+ case MiddlewareKind::FASTDDS:
+ ASSERT_NO_FATAL_FAILURE(ClientCan::create_entities_xml(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::FASTRTPS:
+ ASSERT_NO_FATAL_FAILURE(ClientCan::create_entities_xml(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::CED:
+ ASSERT_NO_FATAL_FAILURE(ClientCan::create_entities_xml(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ }
+ else if (creation_mode_ == XRCECreationMode::XRCE_BIN_CREATION)
+ {
+ switch (middleware_)
+ {
+ case MiddlewareKind::FASTDDS:
+ ASSERT_NO_FATAL_FAILURE(ClientCan::create_entities_bin(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::CED:
+ ASSERT_NO_FATAL_FAILURE(ClientCan::create_entities_bin(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ default:
+ FAIL() << "Transport type not supported";
+ break;
+ }
+ }
+ }
+
+ void close()
+ {
+ ASSERT_NO_FATAL_FAILURE(ClientCan::close_transport());
+ }
+
+ uint8_t id_;
+
+private:
+ MiddlewareKind middleware_;
+ XRCECreationMode creation_mode_;
+ const char *dev_;
+ const uint32_t can_id_;
+};
+
+
+class PublisherSubscriberCan : public ::testing::TestWithParam>
+{
+public:
+ PublisherSubscriberCan()
+ : agent_((MiddlewareKind) std::get<1>(GetParam()))
+ , publisher_(GetParam(), 1, agent_.dev, agent_.can_id + 1)
+ , subscriber_(GetParam(), 1, agent_.dev, agent_.can_id + 2)
+ {
+ agent_.start();
+ }
+
+ ~PublisherSubscriberCan()
+ {
+ agent_.stop();
+ }
+
+ void SetUp() override
+ {
+ ASSERT_NO_FATAL_FAILURE(publisher_.init());
+ ASSERT_NO_FATAL_FAILURE(subscriber_.init());
+ }
+
+ void TearDown() override
+ {
+ ASSERT_NO_FATAL_FAILURE(publisher_.close());
+ ASSERT_NO_FATAL_FAILURE(subscriber_.close());
+ }
+
+ void check_messages(std::string message, size_t number, uint8_t stream_id_raw)
+ {
+ std::thread publisher_thread(&ClientCan::publish, &publisher_, publisher_.id_, stream_id_raw, number, message);
+ std::thread subscriber_thread(&ClientCan::subscribe, &subscriber_, subscriber_.id_, stream_id_raw, number, message);
+
+ publisher_thread.join();
+ subscriber_thread.join();
+ }
+
+protected:
+ AgentCan agent_;
+ PubSubCan publisher_;
+ PubSubCan subscriber_;
+ static const std::string SMALL_MESSAGE;
+};
+
+const std::string PublisherSubscriberCan::SMALL_MESSAGE("Hello DDS world!");
+
+TEST_P(PublisherSubscriberCan, PubSub10TopicsBestEffort)
+{
+ std::this_thread::sleep_for(std::chrono::seconds(2)); // Waiting for matching.
+ check_messages(SMALL_MESSAGE, 10, 0x01);
+}
+
+TEST_P(PublisherSubscriberCan, PubSub10TopicsReliable)
+{
+ std::this_thread::sleep_for(std::chrono::seconds(2)); // Waiting for matching.
+ check_messages(SMALL_MESSAGE, 10, 0x80);
+}
+
+TEST_P(PublisherSubscriberCan, PubSub1ContinousFragmentedTopic)
+{
+ std::string message(size_t(publisher_.get_mtu() * 8), 'A');
+ publisher_.publish(1, 0x80, 1, message);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ PubSubCan,
+ PublisherSubscriberCan,
+ ::testing::Combine(
+ ::testing::Values(MiddlewareKind::FASTDDS),
+ ::testing::Values(0.0f),
+ ::testing::Values(XRCECreationMode::XRCE_XML_CREATION, XRCECreationMode::XRCE_BIN_CREATION)));
\ No newline at end of file
diff --git a/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp b/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp
index 67ee1dc..870a91d 100644
--- a/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp
+++ b/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp
@@ -1,149 +1,125 @@
#include
+#include
#include
-#ifdef _WIN32
-#include
-#include
-#include
-#include
-#else
-#include
-#include
-#include
-#include
-#endif
-
-#include
-
-#include <../custom_transports/Custom_transports.hpp>
-
-#include
+#include "../client_agent/ClientAgentInteraction.hpp"
-class PublisherSubscriberNoLost : public ::testing::TestWithParam>
+class PubSub : public Client
{
public:
- const uint16_t AGENT_PORT = 2018 + uint16_t(std::get<0>(this->GetParam()));
-
- PublisherSubscriberNoLost()
- : transport_(std::get<0>(GetParam()))
- , middleware_{}
- , publisher_(std::get<2>(GetParam()), 8)
- , subscriber_(std::get<2>(GetParam()), 8)
+ PubSub(std::tuple parameters,
+ const uint16_t AGENT_PORT,
+ uint8_t id)
+ : Client(std::get<2>(parameters), 8)
+ , transport_(std::get<0>(parameters))
+ , middleware_(std::get<1>(parameters))
+ , creation_mode_(std::get<3>(parameters))
+ , AGENT_PORT_(AGENT_PORT)
+ , id_(id)
{
- switch (std::get<1>(GetParam()))
- {
- case MiddlewareKind::FASTDDS:
- middleware_ = eprosima::uxr::Middleware::Kind::FASTDDS;
- break;
- case MiddlewareKind::FASTRTPS:
- middleware_ = eprosima::uxr::Middleware::Kind::FASTRTPS;
- break;
- case MiddlewareKind::CED:
- middleware_ = eprosima::uxr::Middleware::Kind::CED;
- break;
- }
- init_agent(AGENT_PORT);
}
- ~PublisherSubscriberNoLost()
+ ~PubSub()
{}
- void SetUp() override
+ void init()
{
- if (transport_ == Transport::UDP_IPV4_TRANSPORT || transport_ == Transport::TCP_IPV4_TRANSPORT)
+ switch(transport_)
{
- ASSERT_NO_FATAL_FAILURE(publisher_.init_transport(transport_, "127.0.0.1", std::to_string(AGENT_PORT).c_str()));
- ASSERT_NO_FATAL_FAILURE(subscriber_.init_transport(transport_, "127.0.0.1", std::to_string(AGENT_PORT).c_str()));
+ case Transport::UDP_IPV4_TRANSPORT:
+ case Transport::TCP_IPV4_TRANSPORT:
+ {
+ ASSERT_NO_FATAL_FAILURE(Client::init_transport(transport_, "127.0.0.1", std::to_string(AGENT_PORT_).c_str()));
+ break;
+ }
+ case Transport::UDP_IPV6_TRANSPORT:
+ case Transport::TCP_IPV6_TRANSPORT:
+ {
+ ASSERT_NO_FATAL_FAILURE(Client::init_transport(transport_, "::1", std::to_string(AGENT_PORT_).c_str()));
+ break;
+ }
+
+ case Transport::CUSTOM_WITH_FRAMING:
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ {
+ ASSERT_NO_FATAL_FAILURE(Client::init_transport(transport_, NULL, NULL));
+ break;
+ }
}
- else if (transport_ == Transport::UDP_IPV6_TRANSPORT || transport_ == Transport::TCP_IPV6_TRANSPORT)
+
+ if (creation_mode_ == XRCECreationMode::XRCE_XML_CREATION)
{
- ASSERT_NO_FATAL_FAILURE(publisher_.init_transport(transport_, "::1", std::to_string(AGENT_PORT).c_str()));
- ASSERT_NO_FATAL_FAILURE(subscriber_.init_transport(transport_, "::1", std::to_string(AGENT_PORT).c_str()));
+ switch (middleware_)
+ {
+ case MiddlewareKind::FASTDDS:
+ ASSERT_NO_FATAL_FAILURE(Client::create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::FASTRTPS:
+ ASSERT_NO_FATAL_FAILURE(Client::create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::CED:
+ ASSERT_NO_FATAL_FAILURE(Client::create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
}
- else if (transport_ == Transport::CUSTOM_WITH_FRAMING || transport_ == Transport::CUSTOM_WITHOUT_FRAMING)
+ else if (creation_mode_ == XRCECreationMode::XRCE_BIN_CREATION)
{
- ASSERT_NO_FATAL_FAILURE(publisher_.init_transport(transport_, NULL, NULL));
- ASSERT_NO_FATAL_FAILURE(subscriber_.init_transport(transport_, NULL, NULL));
+ switch (middleware_)
+ {
+ case MiddlewareKind::FASTDDS:
+ ASSERT_NO_FATAL_FAILURE(Client::create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::CED:
+ ASSERT_NO_FATAL_FAILURE(Client::create_entities_bin(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ default:
+ // Not supported
+ ASSERT_TRUE(0);
+ break;
+ }
}
+ }
- switch (std::get<1>(GetParam()))
- {
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(publisher_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(subscriber_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(publisher_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(subscriber_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(publisher_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(subscriber_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- break;
- }
+ void close()
+ {
+ Client::close_transport(transport_);
}
- void TearDown() override
+private:
+ const uint16_t AGENT_PORT_;
+ Transport transport_;
+ MiddlewareKind middleware_;
+ XRCECreationMode creation_mode_;
+ uint8_t id_;
+};
+
+class PublisherSubscriberNoLost : public ::testing::TestWithParam>
+{
+public:
+ const uint16_t AGENT_PORT = 2018 + uint16_t(std::get<0>(this->GetParam()));
+
+ PublisherSubscriberNoLost()
+ : transport_(std::get<0>(GetParam()))
+ , agent_(transport_, (MiddlewareKind) std::get<1>(GetParam()), AGENT_PORT)
+ , publisher_(GetParam(), AGENT_PORT, 1)
+ , subscriber_(GetParam(), AGENT_PORT, 1)
{
- ASSERT_NO_FATAL_FAILURE(publisher_.close_transport(transport_));
- ASSERT_NO_FATAL_FAILURE(subscriber_.close_transport(transport_));
+ agent_.start();
}
- void init_agent(uint16_t port)
+ ~PublisherSubscriberNoLost()
+ {}
+
+ void SetUp() override
{
- switch(transport_)
- {
- case Transport::UDP_IPV4_TRANSPORT:
- agent_udp4_.reset(new eprosima::uxr::UDPv4Agent(port, middleware_));
- agent_udp4_->set_verbose_level(6);
- ASSERT_TRUE(agent_udp4_->start());
- break;
- case Transport::UDP_IPV6_TRANSPORT:
- agent_udp6_.reset(new eprosima::uxr::UDPv6Agent(port, middleware_));
- agent_udp6_->set_verbose_level(6);
- ASSERT_TRUE(agent_udp6_->start());
- break;
- case Transport::TCP_IPV4_TRANSPORT:
- agent_tcp4_.reset(new eprosima::uxr::TCPv4Agent(port, middleware_));
- agent_tcp4_->set_verbose_level(6);
- ASSERT_TRUE(agent_tcp4_->start());
- break;
- case Transport::TCP_IPV6_TRANSPORT:
- agent_tcp6_.reset(new eprosima::uxr::TCPv6Agent(port, middleware_));
- agent_tcp6_->set_verbose_level(6);
- ASSERT_TRUE(agent_tcp6_->start());
- break;
- case Transport::CUSTOM_WITHOUT_FRAMING:
- agent_custom_endpoint_.add_member("index");
-
- agent_custom_.reset(new eprosima::uxr::CustomAgent(
- "custom_agent",
- &agent_custom_endpoint_,
- middleware_,
- false,
- agent_custom_transport_open,
- agent_custom_transport_close,
- agent_custom_transport_write_packet,
- agent_custom_transport_read_packet));
- agent_custom_->set_verbose_level(6);
- ASSERT_TRUE(agent_custom_->start());
- break;
- case Transport::CUSTOM_WITH_FRAMING:
- agent_custom_endpoint_.add_member("index");
-
- agent_custom_.reset(new eprosima::uxr::CustomAgent(
- "custom_agent",
- &agent_custom_endpoint_,
- middleware_,
- true,
- agent_custom_transport_open,
- agent_custom_transport_close,
- agent_custom_transport_write_stream,
- agent_custom_transport_read_stream));
- agent_custom_->set_verbose_level(6);
- ASSERT_TRUE(agent_custom_->start());
- break;
- }
+ publisher_.init();
+ subscriber_.init();
+ }
+
+ void TearDown() override
+ {
+ ASSERT_NO_FATAL_FAILURE(publisher_.close());
+ ASSERT_NO_FATAL_FAILURE(subscriber_.close());
}
void check_messages(std::string message, size_t number, uint8_t stream_id_raw)
@@ -157,16 +133,9 @@ class PublisherSubscriberNoLost : public ::testing::TestWithParam agent_udp4_;
- std::unique_ptr agent_udp6_;
- std::unique_ptr agent_tcp4_;
- std::unique_ptr agent_tcp6_;
- std::unique_ptr agent_custom_;
- eprosima::uxr::CustomEndPoint agent_custom_endpoint_;
-
- eprosima::uxr::Middleware::Kind middleware_;
- Client publisher_;
- Client subscriber_;
+ Agent agent_;
+ PubSub publisher_;
+ PubSub subscriber_;
static const std::string SMALL_MESSAGE;
};
@@ -204,6 +173,7 @@ TEST_P(PublisherSubscriberNoLost, PubSub1ContinousFragmentedTopic)
publisher_.publish(1, 0x80, 1, message);
}
+
// TODO (#4423) Fix the non-reliable behavior when messages is higher than the agent history to enable this
/*TEST_P(PublisherSubscriberNoLost, PubSub30TopicsReliable)
{
@@ -211,13 +181,53 @@ TEST_P(PublisherSubscriberNoLost, PubSub1ContinousFragmentedTopic)
check_messages(SMALL_MESSAGE, 30, 0x80);
}*/
+class PublisherSubscriberUnitary : public PublisherSubscriberNoLost {};
+
+TEST_P(PublisherSubscriberUnitary, PubSub1WithPing)
+{
+ size_t message_number = 1;
+ int64_t timeout = 10000;
+
+ publisher_.publish(1, 0x80, message_number, SMALL_MESSAGE);
+ subscriber_.request_data(1, 0x80, SMALL_MESSAGE);
+
+ int64_t start_time = uxr_millis();
+
+ while((uxr_millis() - start_time) < timeout && subscriber_.get_received_topics() != message_number){
+ subscriber_.ping_agent_session();
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ // Check number of topics received
+ ASSERT_EQ(subscriber_.get_received_topics(), message_number);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ PublisherSubscriberWithPing,
+ PublisherSubscriberUnitary,
+ ::testing::Combine(
+ ::testing::Values(Transport::UDP_IPV4_TRANSPORT),
+ ::testing::Values(MiddlewareKind::FASTDDS),
+ ::testing::Values(0.0f),
+ ::testing::Values(XRCECreationMode::XRCE_BIN_CREATION)));
+
INSTANTIATE_TEST_CASE_P(
TransportAndLost,
PublisherSubscriberNoLost,
::testing::Combine(
::testing::Values(Transport::UDP_IPV4_TRANSPORT, Transport::UDP_IPV6_TRANSPORT, Transport::TCP_IPV4_TRANSPORT, Transport::TCP_IPV6_TRANSPORT),
::testing::Values(MiddlewareKind::FASTDDS, MiddlewareKind::FASTRTPS, MiddlewareKind::CED),
- ::testing::Values(0.0f)));
+ ::testing::Values(0.0f),
+ ::testing::Values(XRCECreationMode::XRCE_XML_CREATION)));
+
+INSTANTIATE_TEST_CASE_P(
+ TransportAndLostCreationModes,
+ PublisherSubscriberNoLost,
+ ::testing::Combine(
+ ::testing::Values(Transport::UDP_IPV4_TRANSPORT),
+ ::testing::Values(MiddlewareKind::FASTDDS, MiddlewareKind::CED),
+ ::testing::Values(0.0f),
+ ::testing::Values(XRCECreationMode::XRCE_XML_CREATION, XRCECreationMode::XRCE_BIN_CREATION)));
INSTANTIATE_TEST_CASE_P(
TransportAndLostCustomTransports,
@@ -225,7 +235,8 @@ INSTANTIATE_TEST_CASE_P(
::testing::Combine(
::testing::Values(Transport::CUSTOM_WITH_FRAMING, Transport::CUSTOM_WITHOUT_FRAMING),
::testing::Values(MiddlewareKind::FASTDDS),
- ::testing::Values(0.0f)));
+ ::testing::Values(0.0f),
+ ::testing::Values(XRCECreationMode::XRCE_XML_CREATION)));
TEST_P(PublisherSubscriberLost, PubSub1FragmentedTopic2Parts)
{
@@ -257,7 +268,8 @@ INSTANTIATE_TEST_CASE_P(
::testing::Combine(
::testing::Values(Transport::UDP_IPV4_TRANSPORT),
::testing::Values(MiddlewareKind::CED),
- ::testing::Values(0.05f, 0.1f)));
+ ::testing::Values(0.05f, 0.1f),
+ ::testing::Values(XRCECreationMode::XRCE_XML_CREATION)));
int main(int args, char** argv)
diff --git a/test/test/publisher_subscriber/PublisherSubscriberSerial.cpp b/test/test/publisher_subscriber/PublisherSubscriberSerial.cpp
new file mode 100644
index 0000000..3b2b6fc
--- /dev/null
+++ b/test/test/publisher_subscriber/PublisherSubscriberSerial.cpp
@@ -0,0 +1,172 @@
+#include
+#include
+
+#include
+#include "../client_agent/ClientAgentSerial.hpp"
+
+class PubSubSerial : public ClientSerial
+{
+public:
+ PubSubSerial(std::tuple parameters,
+ uint8_t id,
+ int masterfd)
+ : ClientSerial(std::get<2>(parameters), 8)
+ , transport_(std::get<0>(parameters))
+ , middleware_(std::get<1>(parameters))
+ , creation_mode_(std::get<3>(parameters))
+ , id_(id)
+ , masterfd_(masterfd)
+ {}
+
+ ~PubSubSerial()
+ {}
+
+ void init()
+ {
+ switch(transport_)
+ {
+ case Transport::SERIAL_TRANSPORT:
+ case Transport::MULTISERIAL_TRANSPORT:
+ grantpt(masterfd_);
+ unlockpt(masterfd_);
+ ASSERT_NO_FATAL_FAILURE(ClientSerial::init_transport(transport_, ptsname(masterfd_), NULL));
+ break;
+
+ default:
+ FAIL() << "Transport type not supported";
+ break;
+ }
+
+ if (creation_mode_ == XRCECreationMode::XRCE_XML_CREATION)
+ {
+ switch (middleware_)
+ {
+ case MiddlewareKind::FASTDDS:
+ ASSERT_NO_FATAL_FAILURE(ClientSerial::create_entities_xml(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::FASTRTPS:
+ ASSERT_NO_FATAL_FAILURE(ClientSerial::create_entities_xml(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::CED:
+ ASSERT_NO_FATAL_FAILURE(ClientSerial::create_entities_xml(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ }
+ else if (creation_mode_ == XRCECreationMode::XRCE_BIN_CREATION)
+ {
+ switch (middleware_)
+ {
+ case MiddlewareKind::FASTDDS:
+ ASSERT_NO_FATAL_FAILURE(ClientSerial::create_entities_bin(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ case MiddlewareKind::CED:
+ ASSERT_NO_FATAL_FAILURE(ClientSerial::create_entities_bin(id_, 0x80, UXR_STATUS_OK, 0));
+ break;
+ default:
+ FAIL() << "Creation mode not supported";
+ break;
+ }
+ }
+ }
+
+ void close()
+ {
+ ASSERT_NO_FATAL_FAILURE(ClientSerial::close_transport(transport_));
+ }
+
+ uint8_t id_;
+
+private:
+ Transport transport_;
+ MiddlewareKind middleware_;
+ XRCECreationMode creation_mode_;
+ int masterfd_;
+};
+
+
+class PublisherSubscriberSerial : public ::testing::TestWithParam>
+{
+public:
+ PublisherSubscriberSerial()
+ : transport_(std::get<0>(GetParam()))
+ , agent_(transport_, (MiddlewareKind) std::get<1>(GetParam()))
+ , clients_{}
+ {
+ agent_.start();
+ agent_.wait_multiserial_open();
+
+ for (auto & element : agent_.getfd_multi())
+ {
+ PubSubSerial entity(GetParam(), element, element);
+ clients_.push_back(entity);
+ }
+ }
+
+ ~PublisherSubscriberSerial()
+ {}
+
+ void SetUp() override
+ {
+ for (auto & entity : clients_)
+ {
+ entity.init();
+ }
+ }
+
+ void TearDown() override
+ {
+ for (auto & entity : clients_)
+ {
+ ASSERT_NO_FATAL_FAILURE(entity.close());
+ }
+ }
+
+ void check_messages(std::string message, size_t number, uint8_t stream_id_raw)
+ {
+ int expected_number = (clients_.size()-1)*number;
+ std::thread subscriber_thread(&ClientSerial::subscribe, &clients_[0], clients_[0].id_, stream_id_raw, expected_number, message);
+ std::vector pub_thr;
+
+ for (auto it = clients_.begin()+1; it != clients_.end(); it++)
+ {
+ std::thread publisher_thread(&ClientSerial::publish, it, it->id_, stream_id_raw, number, message);
+ pub_thr.push_back(std::move(publisher_thread));
+ }
+
+ for (auto & thr : pub_thr)
+ {
+ if (thr.joinable())
+ {
+ thr.join();
+ }
+ }
+
+ if (subscriber_thread.joinable())
+ {
+ subscriber_thread.join();
+ }
+ }
+
+protected:
+ Transport transport_;
+ AgentSerial agent_;
+ std::vector clients_;
+ static const std::string SMALL_MESSAGE;
+};
+
+const std::string PublisherSubscriberSerial::SMALL_MESSAGE("Hello DDS world!");
+
+TEST_P(PublisherSubscriberSerial, MultiPubSub)
+{
+ std::this_thread::sleep_for(std::chrono::seconds(2)); // Waiting for matching.
+ check_messages(SMALL_MESSAGE, 10, 0x01);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ MultiSerialPubSubBin,
+ PublisherSubscriberSerial,
+ ::testing::Combine(
+ ::testing::Values(Transport::MULTISERIAL_TRANSPORT),
+ ::testing::Values(MiddlewareKind::FASTDDS),
+ ::testing::Values(0.0f),
+ ::testing::Values(XRCECreationMode::XRCE_BIN_CREATION)));