diff --git a/CMakeLists.txt b/CMakeLists.txt
index 648b149..8dd1ab9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -27,11 +27,12 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif()
-project(microxrcedds VERSION "1.3.1")
+project(microxrcedds VERSION "2.0.0")
-set(_client_tag v1.2.5)
-set(_agent_tag v1.4.1)
-set(_gen_tag v1.1.0)
+set(_client_tag master)
+set(_client_version 2.0.0)
+set(_agent_tag master)
+set(_gen_tag master)
###############################################################################
# Build options.
@@ -41,6 +42,7 @@ option(UXRCE_ENABLE_CLIENT "Enable the building and installation of Micro XRCE-D
option(UXRCE_ENABLE_AGENT "Enable the building and installation of Micro XRCE-DDS Agent." ON)
option(UXRCE_ENABLE_GEN "Enable the building and installation of Micro XRCE-DDS Gen." OFF)
option(UXRCE_BUILD_TESTS "Build tests." OFF)
+option(UXRCE_BUILD_PROFILING "Build profiling test executables.")
option(UXRCE_BUILD_CI_TESTS "Build CI tests." OFF)
if(UXRCE_BUILD_CI_TESTS)
@@ -149,6 +151,9 @@ if(UXRCE_ENABLE_AGENT)
-DLIB_INSTALL_DIR:PATH=${LIB_INSTALL_DIR}
-DDATA_INSTALL_DIR:PATH=${DATA_INSTALL_DIR}
-DUAGENT_BUILD_TESTS:BOOL=${UXRCE_BUILD_TESTS}
+ -DUAGENT_P2P_CLIENT_TAG:STRING=${_client_tag}
+ -DUAGENT_P2P_CLIENT_VERSION:STRING=${_client_version}
+ -DUAGENT_BUILD_EXECUTABLE:BOOL=OFF
-DUAGENT_ISOLATED_INSTALL:BOOL=OFF
-DGTEST_INDIVIDUAL:BOOL=ON
DEPENDS
@@ -235,6 +240,10 @@ if(UXRCE_BUILD_TESTS)
)
endif()
+if(UXRCE_BUILD_PROFILING)
+ add_subdirectory(test/profiling)
+endif()
+
###############################################################################
# Install
###############################################################################
diff --git a/Dockerfile b/Dockerfile
index bbab829..b8bc25b 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -34,7 +34,7 @@ RUN cd /uxrce/build && \
-DCMAKE_INSTALL_PREFIX=../install \
-DUXRCE_BUILD_EXAMPLES=ON \
.. &&\
- make && make install
+ make -j $(nproc) && make install
# Prepare Micro XRCE-DDS artifacts
RUN cd /uxrce && \
diff --git a/README.md b/README.md
index 6e3f27a..4ca6d8f 100644
--- a/README.md
+++ b/README.md
@@ -8,23 +8,34 @@
-*eProsima Micro XRCE-DDS* is a software solution which allows to communicate eXtremely Resource Constrained Environments (XRCEs) with an existing DDS network.
- This implementation complies with the specification proposal, "eXtremely Resource Constrained Environments DDS (DDS-XRCE)" submitted to the Object Management Group (OMG) consortium.
+*eProsima Micro XRCE-DDS* is a library implementing the [DDS-XRCE protocol](https://www.omg.org/spec/DDS-XRCE/About-DDS-XRCE/) as defined and maintained by the OMG, whose aim is to allow resource constrained devices such as microcontrollers to communicate with the [DDS](https://www.omg.org/spec/DDS/About-DDS/>) world as any other DDS actor would do.
+It follows a client/server paradigm and is composed by two libraries, the *Micro XRCE-DDS Client* and the *Micro XRCE-DDS Agent*. The *Micro XRCE-DDS Clients* are lightweight entities meant to be compiled on e**X**tremely **R**esource **C**onstrained **E**nvironments, while the *Micro XRCE-DDS Agent* is a broker which bridges the *Clients* with the DDS world.
-*Micro XRCE-DDS* implements a client-server protocol to enable resource-constrained devices (clients) to take part in DDS communications.
-*Micro XRCE-DDS Agent* (server) makes possible this communication.
-The *Micro XRCE-DDS Agent* acts on behalf of the *Micro XRCE-DDS Clients* and enables them to take part as DDS publishers and/or subscribers in the DDS Global Data Space.
-*Micro XRCE-DDS* provides both, a plug and play *Micro XRCE-DDS Agent* and an API layer which allows you to implement your *Micro XRCE-DDS Clients*.
+
-
+The *Micro XRCE-DDS Clients* request operations to the *Agent* to publish and/or subscribe to topics in the DDS global dataspace. Remote procedure calls, as defined by the [DDS-RPC standard](https://www.omg.org/spec/DDS-RPC/About-DDS-RPC/), are also supported, allowing *Clients* to communicate in the DDS dataspace according to a request/reply paradigm.
+The *Agents* process these requests and send back a response with the operation status result and with the requested data, in the case of subscribe/reply operations.
-This repository contains the totality of *Micro XRCE-DDS* products:
+*eProsima Micro XRCE-DDS* provides the user with a C API to create *Micro XRCE-DDS Clients* applications. The library can be configured at compile-time via a set of CMake flags allowing to enable or disable some profiles before compilation, and to manipulate several parameters controlling some of the library's functionalities, which in turn allow tuning the library size.
-
+The *Micro XRCE-DDS Agent* receives messages containing request operations from the *Clients*, processes these requests and sends back a response with the operation status result and with the requested data, in the case of subscribe/reply operations.
+
+*Agents* keep track of the *Clients* by means of a dedicated `ProxyClient` entity that acts on behalf of the latter.
+This is made possible by the creation of *DDS Entities* on the *Agent* as a result of *Clients*' operations, such as *Participants*, *Topics*, *Publishers*, and *Subscribers*, which can interact with the DDS global dataspace.
+
+The communication between a *Micro XRCE-DDS Client* and a *Micro XRCE-DDS Agent* is achieved by means of several kinds of built-in transports: **UDPv4**, **UDPv6**, **TCPv4**, **TCPv6** and **Serial** communication. In addition, there is the possibility for the user to generate its own **Custom** transport.
+
+
+
+This repository contains the totality of the *eProsima Micro XRCE-DDS* products:
+
+- [*Micro XRCE-DDS Client*](https://github.com/eProsima/Micro-XRCE-DDS-Client)
+- [*Micro XRCE-DDS Agent*](https://github.com/eProsima/Micro-XRCE-DDS-Agent)
+- [*Micro XRCE-DDS Gen*](https://github.com/eProsima/Micro-XRCE-DDS-Gen)
## Documentation
-You can access Micro XRCE-DDS documentation online, which is hosted on Read the Docs.
+You can access the *eProsima Micro XRCE-DDS* user documentation online, which is hosted on Read the Docs.
* [Start Page](http://micro-xrce-dds.readthedocs.io)
* [Installation manual](http://micro-xrce-dds.readthedocs.io/en/latest/installation.html)
diff --git a/ci/linux/CMakeLists.txt b/ci/linux/CMakeLists.txt
index ee6e112..9ee5a57 100644
--- a/ci/linux/CMakeLists.txt
+++ b/ci/linux/CMakeLists.txt
@@ -24,10 +24,10 @@ include(ExternalProject)
include(CheckCCompilerFlag)
include(CheckCXXCompilerFlag)
-set(_c_flags "-fwrapv -fprofile-arcs -ftest-coverage")
-set(_cxx_flags "-fwrapv -fprofile-arcs -ftest-coverage")
-set(_exe_linker_flags "-fprofile-arcs -ftest-coverage")
-set(_shared_linker_flags "-fprofile-arcs -ftest-coverage")
+set(_c_flags "-fwrapv -fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline")
+set(_cxx_flags "-fwrapv -fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline")
+set(_exe_linker_flags "-fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline")
+set(_shared_linker_flags "-fprofile-arcs -ftest-coverage --coverage -fno-inline -fno-inline-small-functions -fno-default-inline")
check_cxx_compiler_flag("-fprofile-abs-path" _have_fprofile_abs_path)
if(_have_fprofile_abs_path)
diff --git a/docs/Concept.png b/docs/Concept.png
new file mode 100644
index 0000000..e63c715
Binary files /dev/null and b/docs/Concept.png differ
diff --git a/docs/General.png b/docs/General.png
new file mode 100644
index 0000000..f073364
Binary files /dev/null and b/docs/General.png differ
diff --git a/docs/general_architecture.png b/docs/general_architecture.png
deleted file mode 100644
index 525fbb9..0000000
Binary files a/docs/general_architecture.png and /dev/null differ
diff --git a/docs/xrcedds_architecture.png b/docs/xrcedds_architecture.png
deleted file mode 100644
index 1990a40..0000000
Binary files a/docs/xrcedds_architecture.png and /dev/null differ
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 89032ab..6e6f8ec 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -46,4 +46,5 @@ add_subdirectory(test/interaction_client)
add_subdirectory(test/client_agent)
add_subdirectory(test/publisher_subscriber)
add_subdirectory(test/discovery)
+add_subdirectory(test/custom_transports)
#add_subdirectory(test/shapes_demo) TODO (julibert): fix client and agent paths.
diff --git a/test/profiling/CMakeLists.txt b/test/profiling/CMakeLists.txt
new file mode 100644
index 0000000..d4f6acb
--- /dev/null
+++ b/test/profiling/CMakeLists.txt
@@ -0,0 +1,96 @@
+# Copyright 2017-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5)
+
+project(microxrcedds-profiling)
+
+ExternalProject_Add(clientlib
+ GIT_REPOSITORY
+ https://github.com/eProsima/Micro-XRCE-DDS-Client.git
+ GIT_TAG
+ ${_client_tag}
+ PREFIX
+ ${PROJECT_BINARY_DIR}/client
+ INSTALL_DIR
+ ${PROJECT_BINARY_DIR}/install
+ CMAKE_CACHE_ARGS
+ -DCMAKE_INSTALL_PREFIX:PATH=
+ )
+
+ExternalProject_Add(agentlib
+ GIT_REPOSITORY
+ https://github.com/eProsima/Micro-XRCE-DDS-Agent.git
+ GIT_TAG
+ ${_agent_tag}
+ PREFIX
+ ${PROJECT_BINARY_DIR}/agent
+ INSTALL_DIR
+ ${PROJECT_BINARY_DIR}/install
+ CMAKE_CACHE_ARGS
+ -DBUILD_SHARED_LIBS:BOOL=FALSE
+ -DCMAKE_BUILD_TYPE:STRING=MinSizeRel
+ -DCMAKE_INSTALL_PREFIX:PATH=
+ -DUAGENT_FAST_PROFILE:BOOL=FALSE
+ -DUAGENT_DISCOVERY_PROFILE:BOOL=FALSE
+ -DUAGENT_P2P_PROFILE:BOOL=FALSE
+ -DUAGENT_LOGGER_PROFILE:BOOL=FALSE
+ -DUAGENT_SERVER_BUFFER_SIZE:STRING=1024
+ )
+
+ExternalProject_Add(publisher
+ SOURCE_DIR
+ ${CMAKE_CURRENT_LIST_DIR}/publisher
+ PREFIX
+ ${PROJECT_BINARY_DIR}/publisher
+ INSTALL_DIR
+ ${PROJECT_BINARY_DIR}/install
+ CMAKE_CACHE_ARGS
+ -DCMAKE_INSTALL_PREFIX:PATH=
+ -DCMAKE_PREFIX_PATH:PATH=${PROJECT_BINARY_DIR}/install
+ DEPENDS
+ clientlib
+ )
+
+ExternalProject_Add(subscriber
+ SOURCE_DIR
+ ${CMAKE_CURRENT_LIST_DIR}/subscriber
+ PREFIX
+ ${PROJECT_BINARY_DIR}/subscriber
+ INSTALL_DIR
+ ${PROJECT_BINARY_DIR}/install
+ CMAKE_CACHE_ARGS
+ -DCMAKE_INSTALL_PREFIX:PATH=
+ -DCMAKE_PREFIX_PATH:PATH=${PROJECT_BINARY_DIR}/install
+ DEPENDS
+ clientlib
+ )
+
+ExternalProject_Add(agent
+ SOURCE_DIR
+ ${CMAKE_CURRENT_LIST_DIR}/agent
+ PREFIX
+ ${PROJECT_BINARY_DIR}/agent
+ INSTALL_DIR
+ ${PROJECT_BINARY_DIR}/install
+ CMAKE_CACHE_ARGS
+ -DCMAKE_INSTALL_PREFIX:PATH=
+ -DCMAKE_PREFIX_PATH:PATH=
+ DEPENDS
+ agentlib
+ )
+
+file(COPY ${CMAKE_CURRENT_LIST_DIR}/agent-profiling.py DESTINATION ${PROJECT_BINARY_DIR})
+file(COPY ${CMAKE_CURRENT_LIST_DIR}/publisher-profiling.py DESTINATION ${PROJECT_BINARY_DIR})
+file(COPY ${CMAKE_CURRENT_LIST_DIR}/subscriber-profiling.py DESTINATION ${PROJECT_BINARY_DIR})
\ No newline at end of file
diff --git a/test/profiling/README.md b/test/profiling/README.md
new file mode 100644
index 0000000..d978ce4
--- /dev/null
+++ b/test/profiling/README.md
@@ -0,0 +1,24 @@
+# Profiling test
+
+Profiling test is composed of three components:
+
+* A minimal Agent with just `UAGENT_CED_PROFILE` enable.
+* A publisher developed with the Client library.
+* A subscriber developed with the Client library.
+
+## Why?
+
+There are some scenarios where the Agent must run over low resource devices, so it is needed to characterize the minimum requirements of the Agent in terms of memory footprint.
+In that sense, the Agent profiling with the lowest memory footprint is the one that only use the `CedMiddleware` implementation without logger, cli, and p2p.
+Such configuration could be address disabling all the Agent's profiles except the `UAGENT_CED_PROFILE`.
+
+## How?
+
+```bash
+cmake .. -DUXRCE_ENABLE_CLIENT=OFF -DUXRCE_ENABLE_AGENT=OFF -DUXRCE_BUILD_PROFILING=ON
+cmake --build .
+cd build/test/profiling
+python3 agent-profiling.py
+python3 publisher-profiling.py
+python3 subscriber-profiling.py
+```
\ No newline at end of file
diff --git a/test/profiling/agent-profiling.py b/test/profiling/agent-profiling.py
new file mode 100644
index 0000000..9a6dd5d
--- /dev/null
+++ b/test/profiling/agent-profiling.py
@@ -0,0 +1,70 @@
+import msparser
+import os
+import signal
+import subprocess
+import sys
+import time
+import matplotlib.pyplot as plt
+from tabulate import tabulate
+
+n_pubsub = [1, 1 << 1, 1 << 2, 1 << 3, 1 << 4, 1 << 5]
+topic_size = [1 << 8, 1 << 3]
+stack_usage = []
+heap_usage = []
+total_usage = []
+
+for t in topic_size:
+ stack = [t]
+ heap = [t]
+ total = [t]
+ for n in n_pubsub:
+ agent_sp = subprocess.Popen(("valgrind --tool=massif --stacks=yes --detailed-freq=1 --max-snapshots=300 --threshold=0.0 --massif-out-file=./massif-agent.out ./install/bin/agent-profiling").split(), shell=False)
+ time.sleep(1)
+
+ client_key = 1
+ sub_sps = []
+ pub_sps = []
+ for i in range(n):
+ sub_sps.append(subprocess.Popen(["./install/bin/subscriber-profiling {} topic_name_{}".format(client_key, i)], shell=True))
+ client_key += 1
+ pub_sps.append(subprocess.Popen(["./install/bin/publisher-profiling {} topic_name_{} {}".format(client_key, i, t)], shell=True))
+ client_key += 1
+
+ time.sleep(5)
+
+ for i in range(n):
+ pub_sps[i].terminate()
+ sub_sps[i].terminate()
+
+ agent_sp.send_signal(signal.SIGINT)
+ time.sleep(1.0)
+
+ data = msparser.parse_file('massif-agent.out')
+ peak_index = data['peak_snapshot_index']
+ peak_snapshot = data['snapshots'][peak_index]
+ for c in peak_snapshot['heap_tree']['children']:
+ if c['details'] and c['details']['function'] == '???':
+ std_heap = c['nbytes']
+
+ stack.append(round((peak_snapshot['mem_stack'] / 1000), 2))
+ heap.append(round((peak_snapshot['mem_heap'] / 1000), 2))
+ total.append(round(((peak_snapshot['mem_stack'] + peak_snapshot['mem_heap'] + peak_snapshot['mem_heap_extra'] - std_heap) / 1000), 2))
+
+ stack_usage.append(stack)
+ heap_usage.append(heap)
+ total_usage.append(total)
+
+fig, ax = plt.subplots()
+for i in range(len(topic_size)):
+ ax.scatter(n_pubsub, total_usage[i][1:], label="topic size = {} B".format(topic_size[i]))
+ ax.set_ylim(bottom=1)
+ax.set_xlabel("Number of topics")
+ax.set_ylabel("Memory usage (KB)")
+ax.legend()
+plt.show()
+
+table_header = ["topic size (B) / #topics"]
+for n in n_pubsub:
+ table_header.append(n)
+
+print(tabulate(total_usage, headers=table_header))
\ No newline at end of file
diff --git a/test/profiling/agent/CMakeLists.txt b/test/profiling/agent/CMakeLists.txt
new file mode 100644
index 0000000..72ba846
--- /dev/null
+++ b/test/profiling/agent/CMakeLists.txt
@@ -0,0 +1,34 @@
+
+# Copyright 2017-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5)
+
+project(agent-profiling)
+
+find_package(microxrcedds_agent)
+
+add_executable(${PROJECT_NAME} main.cpp)
+
+target_link_libraries(${PROJECT_NAME}
+ PRIVATE
+ microxrcedds_agent
+ )
+
+install(
+ TARGETS
+ ${PROJECT_NAME}
+ RUNTIME DESTINATION
+ bin
+ )
\ No newline at end of file
diff --git a/test/profiling/agent/main.cpp b/test/profiling/agent/main.cpp
new file mode 100644
index 0000000..7ac887c
--- /dev/null
+++ b/test/profiling/agent/main.cpp
@@ -0,0 +1,27 @@
+// Copyright 2017-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+
+int main()
+{
+ eprosima::uxr::UDPv4Agent agent(2020, eprosima::uxr::Middleware::Kind::CED);
+ agent.start();
+
+ while (true) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ return 0;
+}
\ No newline at end of file
diff --git a/test/profiling/publisher-profiling.py b/test/profiling/publisher-profiling.py
new file mode 100644
index 0000000..c90c7df
--- /dev/null
+++ b/test/profiling/publisher-profiling.py
@@ -0,0 +1,35 @@
+import msparser
+import os
+import signal
+import subprocess
+import sys
+import time
+from tabulate import tabulate
+
+agent_sp = subprocess.Popen(["./install/bin/agent-profiling"], shell=True)
+time.sleep(1)
+sub_sp = subprocess.Popen(["./install/bin/subscriber-profiling 1 topic_name"], shell=True)
+pub_sp = subprocess.Popen(("valgrind --tool=massif --stacks=yes --detailed-freq=1 --max-snapshots=300 --threshold=1.0 --massif-out-file=./massif-publisher.out ./install/bin/publisher-profiling 2 topic_name 8").split(), shell=False)
+
+time.sleep(5)
+
+pub_sp.send_signal(signal.SIGINT)
+sub_sp.terminate()
+agent_sp.terminate()
+time.sleep(1)
+
+std_heap_usage = 0
+data = msparser.parse_file('massif-publisher.out')
+peak_index = data['peak_snapshot_index']
+peak_snapshot = data['snapshots'][peak_index]
+for c in peak_snapshot['heap_tree']['children']:
+ if c['details'] and c['details']['function'] == '???':
+ std_heap_usage = c['nbytes']
+
+stack_usage = round((peak_snapshot['mem_stack'] / 1000), 2)
+heap_usage = round((peak_snapshot['mem_heap'] / 1000), 2)
+total_usage = round(((peak_snapshot['mem_stack'] + peak_snapshot['mem_heap'] + peak_snapshot['mem_heap_extra'] - std_heap_usage) / 1000), 2)
+
+print("stack usage: ", stack_usage)
+print("heap usage: ", heap_usage)
+print("total usage: ", total_usage)
\ No newline at end of file
diff --git a/test/profiling/publisher/CMakeLists.txt b/test/profiling/publisher/CMakeLists.txt
new file mode 100644
index 0000000..4b33dba
--- /dev/null
+++ b/test/profiling/publisher/CMakeLists.txt
@@ -0,0 +1,34 @@
+
+# Copyright 2017-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5)
+
+project(publisher-profiling)
+
+find_package(microxrcedds_client)
+
+add_executable(${PROJECT_NAME} main.c)
+
+target_link_libraries(${PROJECT_NAME}
+ PRIVATE
+ microxrcedds_client
+ )
+
+install(
+ TARGETS
+ ${PROJECT_NAME}
+ RUNTIME DESTINATION
+ bin
+ )
\ No newline at end of file
diff --git a/test/profiling/publisher/main.c b/test/profiling/publisher/main.c
new file mode 100644
index 0000000..02e7cb0
--- /dev/null
+++ b/test/profiling/publisher/main.c
@@ -0,0 +1,115 @@
+// Copyright 2017-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+#include
+
+#include //printf
+#include //strcmp
+#include //atoi
+
+#define STREAM_HISTORY 2
+#define BUFFER_SIZE 100 * STREAM_HISTORY
+
+int main(int args, char** argv)
+{
+ // CLI
+ if(4 > args)
+ {
+ printf("usage: client_key topic_name topic_size \n");
+ return 0;
+ }
+
+ uint32_t client_key = (uint32_t)atoi(argv[1]);
+ char* topic_name = argv[2];
+ uint16_t topic_size = (uint16_t)atoi(argv[3]);
+ topic_size = (4 > topic_size) ? 4 : topic_size;
+ uint32_t max_topics = 32u;
+
+ // Transport
+ uxrUDPTransport transport;
+ if(!uxr_init_udp_transport(&transport, UXR_IPv4, "127.0.0.1", "2020"))
+ {
+ printf("Error at create transport.\n");
+ return 1;
+ }
+
+ // Session
+ uxrSession session;
+ uxr_init_session(&session, &transport.comm, client_key);
+ if(!uxr_create_session(&session))
+ {
+ printf("Error at create session.\n");
+ return 1;
+ }
+
+ // Streams
+ uint8_t output_besteffort_stream_buffer[UXR_CONFIG_UDP_TRANSPORT_MTU];
+ uxrStreamId besteffort_out = uxr_create_output_best_effort_stream(&session, output_besteffort_stream_buffer, UXR_CONFIG_UDP_TRANSPORT_MTU);
+
+ uint8_t output_reliable_stream_buffer[BUFFER_SIZE];
+ uxrStreamId reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream_buffer, BUFFER_SIZE, STREAM_HISTORY);
+
+ uint8_t input_reliable_stream_buffer[BUFFER_SIZE];
+ uxr_create_input_reliable_stream(&session, input_reliable_stream_buffer, BUFFER_SIZE, STREAM_HISTORY);
+
+ // Create entities
+ uxrObjectId participant_id = uxr_object_id(0x01, UXR_PARTICIPANT_ID);
+ const char* participant_ref = "participant_name";
+ uint16_t participant_req = uxr_buffer_create_participant_ref(&session, reliable_out, participant_id, 0, participant_ref, UXR_REPLACE);
+
+ uxrObjectId topic_id = uxr_object_id(0x01, UXR_TOPIC_ID);
+ const char* topic_ref = topic_name;
+ uint16_t topic_req = uxr_buffer_create_topic_ref(&session, reliable_out, topic_id, participant_id, topic_ref, UXR_REPLACE);
+
+ uxrObjectId publisher_id = uxr_object_id(0x01, UXR_PUBLISHER_ID);
+ const char* publisher_xml = "";
+ uint16_t publisher_req = uxr_buffer_create_publisher_xml(&session, reliable_out, publisher_id, participant_id, publisher_xml, UXR_REPLACE);
+
+ uxrObjectId datawriter_id = uxr_object_id(0x01, UXR_DATAWRITER_ID);
+ const char* datawriter_ref = topic_ref;
+ uint16_t datawriter_req = uxr_buffer_create_datawriter_xml(&session, reliable_out, datawriter_id, publisher_id, datawriter_ref, UXR_REPLACE);
+
+ // Send create entities message and wait its status
+ uint8_t status[4];
+ uint16_t requests[4] = {participant_req, topic_req, publisher_req, datawriter_req};
+ if(!uxr_run_session_until_all_status(&session, 1000, requests, status, 4))
+ {
+ printf("Error at create entities: participant: %i topic: %i publisher: %i darawriter: %i\n", status[0], status[1], status[2], status[3]);
+ return 1;
+ }
+
+ // Write topics
+ bool connected = true;
+ uint32_t count = 0;
+ while(connected && count < max_topics)
+ {
+ char topic[300] = {};
+ memset(topic, 'A', topic_size);
+
+ ucdrBuffer ub;
+ uxr_prepare_output_stream(&session, besteffort_out, datawriter_id, &ub, 4 + strlen(topic));
+ ucdr_serialize_string(&ub, topic);
+
+ printf("Send topic %s, by %d\n", topic, client_key);
+ connected = uxr_run_session_time(&session, 50);
+ ++count;
+ }
+
+ // Delete resources
+ uxr_delete_session(&session);
+ uxr_close_udp_transport(&transport);
+
+ return 0;
+}
diff --git a/test/profiling/subscriber-profiling.py b/test/profiling/subscriber-profiling.py
new file mode 100644
index 0000000..a66852b
--- /dev/null
+++ b/test/profiling/subscriber-profiling.py
@@ -0,0 +1,35 @@
+import msparser
+import os
+import signal
+import subprocess
+import sys
+import time
+from tabulate import tabulate
+
+agent_sp = subprocess.Popen(["./install/bin/agent-profiling"], shell=True)
+time.sleep(1)
+sub_sp = subprocess.Popen(("valgrind --tool=massif --stacks=yes --detailed-freq=1 --max-snapshots=300 --threshold=1.0 --massif-out-file=./massif-subscriber.out ./install/bin/subscriber-profiling 1 topic_name").split(), shell=False)
+pub_sp = subprocess.Popen(["./install/bin/publisher-profiling 2 topic_name 8"], shell=True)
+
+time.sleep(5)
+
+pub_sp.terminate()
+sub_sp.send_signal(signal.SIGINT)
+agent_sp.terminate()
+time.sleep(1)
+
+std_heap_usage = 0
+data = msparser.parse_file('massif-subscriber.out')
+peak_index = data['peak_snapshot_index']
+peak_snapshot = data['snapshots'][peak_index]
+for c in peak_snapshot['heap_tree']['children']:
+ if c['details'] and c['details']['function'] == '???':
+ std_heap_usage = c['nbytes']
+
+stack_usage = round((peak_snapshot['mem_stack'] / 1000), 2)
+heap_usage = round((peak_snapshot['mem_heap'] / 1000), 2)
+total_usage = round(((peak_snapshot['mem_stack'] + peak_snapshot['mem_heap'] + peak_snapshot['mem_heap_extra'] - std_heap_usage) / 1000), 2)
+
+print("stack usage: ", stack_usage)
+print("heap usage: ", heap_usage)
+print("total usage: ", total_usage)
\ No newline at end of file
diff --git a/test/profiling/subscriber/CMakeLists.txt b/test/profiling/subscriber/CMakeLists.txt
new file mode 100644
index 0000000..a9c6090
--- /dev/null
+++ b/test/profiling/subscriber/CMakeLists.txt
@@ -0,0 +1,34 @@
+
+# Copyright 2017-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5)
+
+project(subscriber-profiling)
+
+find_package(microxrcedds_client)
+
+add_executable(${PROJECT_NAME} main.c)
+
+target_link_libraries(${PROJECT_NAME}
+ PRIVATE
+ microxrcedds_client
+ )
+
+install(
+ TARGETS
+ ${PROJECT_NAME}
+ RUNTIME DESTINATION
+ bin
+ )
\ No newline at end of file
diff --git a/test/profiling/subscriber/main.c b/test/profiling/subscriber/main.c
new file mode 100644
index 0000000..b595968
--- /dev/null
+++ b/test/profiling/subscriber/main.c
@@ -0,0 +1,132 @@
+// Copyright 2017-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+
+#include //printf
+#include //strcmp
+#include //atoi
+
+#define STREAM_HISTORY 2
+#define BUFFER_SIZE 100 * STREAM_HISTORY
+
+uint32_t client_key;
+
+void on_topic(
+ uxrSession* session,
+ uxrObjectId object_id,
+ uint16_t request_id,
+ uxrStreamId stream_id,
+ struct ucdrBuffer* ub,
+ uint16_t length,
+ void* args)
+{
+ (void) session; (void) object_id; (void) request_id; (void) stream_id; (void) length;
+
+ char topic[300];
+ ucdr_deserialize_string(ub, topic, sizeof(topic));
+
+ printf("Received topic %s, by %d\n", topic, client_key);
+
+ uint32_t* count_ptr = (uint32_t*) args;
+ (*count_ptr)++;
+}
+
+int main(int args, char** argv)
+{
+ // CLI
+ if(3 > args)
+ {
+ printf("usage: client_key topic_name\n");
+ return 0;
+ }
+
+ client_key = (uint32_t)atoi(argv[1]);
+ char* topic_name = argv[2];
+ uint32_t max_topics = 28u;
+ uint32_t count = 0;
+
+ // Transport
+ uxrUDPTransport transport;
+ if(!uxr_init_udp_transport(&transport, UXR_IPv4, "127.0.0.1", "2020"))
+ {
+ printf("Error at create transport.\n");
+ return 1;
+ }
+
+ // Session
+ uxrSession session;
+ uxr_init_session(&session, &transport.comm, client_key);
+ uxr_set_topic_callback(&session, on_topic, &count);
+ if(!uxr_create_session(&session))
+ {
+ printf("Error at create session.\n");
+ return 1;
+ }
+
+ // Streams
+ uint8_t output_besteffort_stream_buffer[UXR_CONFIG_UDP_TRANSPORT_MTU];
+ uxrStreamId besteffort_out = uxr_create_output_best_effort_stream(&session, output_besteffort_stream_buffer, UXR_CONFIG_UDP_TRANSPORT_MTU);
+
+ uint8_t output_reliable_stream_buffer[BUFFER_SIZE];
+ uxrStreamId reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream_buffer, BUFFER_SIZE, STREAM_HISTORY);
+
+ uint8_t input_reliable_stream_buffer[BUFFER_SIZE];
+ uxrStreamId reliable_in = uxr_create_input_reliable_stream(&session, input_reliable_stream_buffer, BUFFER_SIZE, STREAM_HISTORY);
+
+ // Create entities
+ uxrObjectId participant_id = uxr_object_id(0x01, UXR_PARTICIPANT_ID);
+ const char* participant_ref = "participant_name";
+ uint16_t participant_req = uxr_buffer_create_participant_ref(&session, reliable_out, participant_id, 0, participant_ref, UXR_REPLACE);
+
+ uxrObjectId topic_id = uxr_object_id(0x01, UXR_TOPIC_ID);
+ const char* topic_ref = topic_name;
+ uint16_t topic_req = uxr_buffer_create_topic_ref(&session, reliable_out, topic_id, participant_id, topic_ref, UXR_REPLACE);
+
+ uxrObjectId subscriber_id = uxr_object_id(0x01, UXR_SUBSCRIBER_ID);
+ const char* subscriber_xml = "";
+ uint16_t subscriber_req = uxr_buffer_create_subscriber_xml(&session, reliable_out, subscriber_id, participant_id, subscriber_xml, UXR_REPLACE);
+
+ uxrObjectId datareader_id = uxr_object_id(0x01, UXR_DATAREADER_ID);
+ const char* datareader_ref = topic_ref;
+ uint16_t datareader_req = uxr_buffer_create_datareader_ref(&session, reliable_out, datareader_id, subscriber_id, datareader_ref, UXR_REPLACE);
+
+ // Send create entities message and wait its status
+ uint8_t status[4];
+ uint16_t requests[4] = {participant_req, topic_req, subscriber_req, datareader_req};
+ if(!uxr_run_session_until_all_status(&session, 1000, requests, status, 4))
+ {
+ printf("Error at create entities: participant: %i topic: %i subscriber: %i datareader: %i\n", status[0], status[1], status[2], status[3]);
+ return 1;
+ }
+
+ // Request topics
+ uxrDeliveryControl delivery_control = {0};
+ delivery_control.max_samples = UXR_MAX_SAMPLES_UNLIMITED;
+ uint16_t read_data_req = uxr_buffer_request_data(&session, besteffort_out, datareader_id, reliable_in, &delivery_control);
+
+ // Read topics
+ bool connected = true;
+ while(connected && count < max_topics)
+ {
+ uint8_t read_data_status;
+ connected = uxr_run_session_until_all_status(&session, UXR_TIMEOUT_INF, &read_data_req, &read_data_status, 1);
+ }
+
+ // Delete resources
+ uxr_delete_session(&session);
+ uxr_close_udp_transport(&transport);
+
+ return 0;
+}
diff --git a/test/test/client_agent/ClientAgentInteraction.cpp b/test/test/client_agent/ClientAgentInteraction.cpp
index 99c228e..7a1d17e 100644
--- a/test/test/client_agent/ClientAgentInteraction.cpp
+++ b/test/test/client_agent/ClientAgentInteraction.cpp
@@ -12,6 +12,8 @@
#include
#endif
+#include
+
#include
#include
@@ -21,6 +23,7 @@ class ClientAgentInteraction : public ::testing::TestWithParam(GetParam()));
const float LOST = 0.1f;
+ static const uint8_t INIT_CLOSE_RETRIES = 20;
ClientAgentInteraction()
: transport_(std::get<0>(GetParam()))
@@ -47,13 +50,26 @@ class ClientAgentInteraction : public ::testing::TestWithParamset_verbose_level(6);
ASSERT_TRUE(agent_udp4_->start());
break;
+ }
case Transport::UDP_IPV6_TRANSPORT:
+ {
agent_udp6_.reset(new eprosima::uxr::UDPv6Agent(port, middleware_));
agent_udp6_->set_verbose_level(6);
ASSERT_TRUE(agent_udp6_->start());
break;
+ }
case Transport::TCP_IPV4_TRANSPORT:
+ {
agent_tcp4_.reset(new eprosima::uxr::TCPv4Agent(port, middleware_));
agent_tcp4_->set_verbose_level(6);
ASSERT_TRUE(agent_tcp4_->start());
break;
+ }
case Transport::TCP_IPV6_TRANSPORT:
+ {
agent_tcp6_.reset(new eprosima::uxr::TCPv6Agent(port, middleware_));
agent_tcp6_->set_verbose_level(6);
ASSERT_TRUE(agent_tcp6_->start());
break;
+ }
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ {
+ try
+ {
+ agent_custom_endpoint_.add_member("index");
+ }
+ catch(const std::exception& /*e*/)
+ {
+ // Do nothing
+ }
+
+
+ agent_custom_.reset(new eprosima::uxr::CustomAgent(
+ "custom_agent",
+ &agent_custom_endpoint_,
+ middleware_,
+ false,
+ agent_custom_transport_open,
+ agent_custom_transport_close,
+ agent_custom_transport_write_packet,
+ agent_custom_transport_read_packet));
+ agent_custom_->set_verbose_level(6);
+ ASSERT_TRUE(agent_custom_->start());
+ break;
+ }
+ case Transport::CUSTOM_WITH_FRAMING:
+ {
+ try
+ {
+ agent_custom_endpoint_.add_member("index");
+ }
+ catch(const std::exception& /*e*/)
+ {
+ // Do nothing
+ }
+
+ agent_custom_.reset(new eprosima::uxr::CustomAgent(
+ "custom_agent",
+ &agent_custom_endpoint_,
+ middleware_,
+ true,
+ agent_custom_transport_open,
+ agent_custom_transport_close,
+ agent_custom_transport_write_stream,
+ agent_custom_transport_read_stream));
+ agent_custom_->set_verbose_level(6);
+ ASSERT_TRUE(agent_custom_->start());
+ break;
+ }
+
}
}
@@ -96,17 +170,31 @@ class ClientAgentInteraction : public ::testing::TestWithParamstop());
break;
+ }
case Transport::UDP_IPV6_TRANSPORT:
+ {
ASSERT_TRUE(agent_udp6_->stop());
break;
+ }
case Transport::TCP_IPV4_TRANSPORT:
+ {
ASSERT_TRUE(agent_tcp4_->stop());
break;
+ }
case Transport::TCP_IPV6_TRANSPORT:
+ {
ASSERT_TRUE(agent_tcp6_->stop());
- break;
+ break;
+ }
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ case Transport::CUSTOM_WITH_FRAMING:
+ {
+ ASSERT_TRUE(agent_custom_->stop());
+ break;
+ }
}
}
@@ -116,13 +204,16 @@ class ClientAgentInteraction : public ::testing::TestWithParam agent_udp6_;
std::unique_ptr agent_tcp4_;
std::unique_ptr agent_tcp6_;
+ std::unique_ptr agent_custom_;
+ eprosima::uxr::CustomEndPoint agent_custom_endpoint_;
+
eprosima::uxr::Middleware::Kind middleware_;
Client client_;
};
TEST_P(ClientAgentInteraction, InitCloseSession)
{
- for (int i = 0; i < 20; ++i)
+ for (int i = 0; i < ClientAgentInteraction::INIT_CLOSE_RETRIES; ++i)
{
TearDown();
SetUp();
@@ -133,15 +224,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationXMLBestEffort)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
}
}
@@ -149,15 +246,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationXMLReliable)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
}
}
@@ -165,15 +268,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationREFBestEffort)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x01, UXR_STATUS_OK, 0));
+ break;
+ }
}
}
@@ -181,15 +290,21 @@ TEST_P(ClientAgentInteraction, NewEntitiesCreationREFReliable)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
+ break;
+ }
}
}
@@ -197,18 +312,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseXMLXMLReliable)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
}
}
@@ -224,18 +345,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReuseREFREFReliable)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_ref(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REUSE));
+ break;
+ }
}
}
@@ -243,18 +370,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReliable)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, UXR_REPLACE));
+ break;
+ }
}
}
@@ -262,18 +395,24 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationNoReplaceReliable)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_ERR_ALREADY_EXISTS, 0));
+ break;
+ }
}
}
@@ -281,21 +420,33 @@ TEST_P(ClientAgentInteraction, ExistantEntitiesCreationReplaceReuseReliable)
{
switch (std::get<1>(GetParam()))
{
- case MiddlewareKind::FASTDDS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
- break;
- case MiddlewareKind::FASTRTPS:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
- break;
- case MiddlewareKind::CED:
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
- ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
- break;
+ case MiddlewareKind::FASTDDS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::FASTRTPS:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
+ break;
+ }
+ case MiddlewareKind::CED:
+ {
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK, 0));
+ ASSERT_NO_FATAL_FAILURE(client_.create_entities_xml(1, 0x80, UXR_STATUS_OK_MATCHED, UXR_REPLACE | UXR_REUSE));
+ break;
+ }
}
}
+TEST_P(ClientAgentInteraction, PingFromClientToAgent)
+{
+ const Transport transport_kind(std::get<0>(GetParam()));
+ ASSERT_NO_FATAL_FAILURE(client_.ping_agent(transport_kind));
+}
+
INSTANTIATE_TEST_CASE_P(
Transports,
ClientAgentInteraction,
@@ -303,6 +454,13 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(Transport::UDP_IPV4_TRANSPORT, Transport::TCP_IPV4_TRANSPORT, Transport::UDP_IPV6_TRANSPORT, Transport::TCP_IPV6_TRANSPORT),
::testing::Values(MiddlewareKind::FASTDDS, MiddlewareKind::FASTRTPS, MiddlewareKind::CED)));
+INSTANTIATE_TEST_CASE_P(
+ CustomTransports,
+ ClientAgentInteraction,
+ ::testing::Combine(
+ ::testing::Values(Transport::CUSTOM_WITHOUT_FRAMING, Transport::CUSTOM_WITH_FRAMING),
+ ::testing::Values(MiddlewareKind::FASTDDS)));
+
int main(int args, char** argv)
{
::testing::InitGoogleTest(&args, argv);
diff --git a/test/test/cross_serialization/ClientSerialization.cpp b/test/test/cross_serialization/ClientSerialization.cpp
index 917027d..41d342e 100644
--- a/test/test/cross_serialization/ClientSerialization.cpp
+++ b/test/test/cross_serialization/ClientSerialization.cpp
@@ -148,7 +148,7 @@ std::vector ClientSerialization::info_payload()
payload.object_info.config._.agent.xrce_version = XrceVersion{0x01, 0x23};
payload.object_info.config._.agent.xrce_vendor_id = XrceVendorId{0x45, 0x67};
payload.object_info.activity.kind = DDS_XRCE_OBJK_AGENT;
- payload.object_info.activity._.agent.availibility = 1;
+ payload.object_info.activity._.agent.availability = 1;
payload.object_info.activity._.agent.address_seq.size = 0x01;
payload.object_info.activity._.agent.address_seq.data[0].format = ADDRESS_FORMAT_MEDIUM;
payload.object_info.activity._.agent.address_seq.data[0]._.medium_locator.locator_port = 0x0123;
diff --git a/test/test/custom_transports/CMakeLists.txt b/test/test/custom_transports/CMakeLists.txt
new file mode 100644
index 0000000..3e16661
--- /dev/null
+++ b/test/test/custom_transports/CMakeLists.txt
@@ -0,0 +1,62 @@
+# Copyright 2021-present Proyectos y Sistemas de Mantenimiento SL (eProsima).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# include(${PROJECT_SOURCE_DIR}/cmake/common/check_configuration.cmake)
+
+# cmake_host_system_information(RESULT HOSTNAME_SUFFIX QUERY HOSTNAME)
+
+# configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../common/EntitiesInfo.hpp.in
+# ${CMAKE_CURRENT_BINARY_DIR}/EntitiesInfo.hpp
+# @ONLY
+# )
+
+set(SRCS
+ Custom_transports.cpp
+ )
+
+add_library(custom_transports STATIC ${SRCS})
+
+set_common_compile_options(custom_transports)
+
+if(MSVC OR MSVC_IDE)
+ target_compile_options(custom_transports
+ PRIVATE
+ /wd4996
+ PUBLIC
+ -D_CRT_SECURE_NO_WARNINGS
+ )
+endif()
+
+target_include_directories(custom_transports
+ PUBLIC
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_BINARY_DIR}
+ PRIVATE
+ ${GTEST_INCLUDE_DIRS}
+ )
+
+target_link_libraries(custom_transports
+ PUBLIC
+ microxrcedds_client
+ microxrcedds_agent
+ PRIVATE
+ ${GTEST_BOTH_LIBRARIES}
+ )
+
+set_target_properties(custom_transports PROPERTIES
+ CXX_STANDARD
+ 11
+ CXX_STANDARD_REQUIRED
+ YES
+ )
diff --git a/test/test/custom_transports/Custom_transports.cpp b/test/test/custom_transports/Custom_transports.cpp
new file mode 100644
index 0000000..19013b7
--- /dev/null
+++ b/test/test/custom_transports/Custom_transports.cpp
@@ -0,0 +1,330 @@
+#include "Custom_transports.hpp"
+#include
+
+#include
+#include
+#include
+
+using packet_fifo = std::queue>;
+using stream_fifo = std::queue;
+
+static std::map client_to_agent_packet_queue;
+static std::map agent_to_client_packet_queue;
+static std::map client_to_agent_stream_queue;
+static std::map agent_to_client_stream_queue;
+
+std::mutex transport_mtx;
+
+template
+static int32_t find_queue_with_data(const std::map& m)
+{
+ for (auto const& it : m)
+ {
+ if (!it.second.empty())
+ {
+ return it.first;
+ }
+ }
+
+ return -1;
+}
+
+template
+static void erase_fifo_by_index(std::map& m, const int32_t index)
+{
+ auto it = m.find(index);
+ if (it != m.end())
+ m.erase (it);
+}
+
+eprosima::uxr::CustomAgent::InitFunction agent_custom_transport_open = []() -> bool
+{
+ return true;
+};
+
+eprosima::uxr::CustomAgent::FiniFunction agent_custom_transport_close = []() -> bool
+{
+ std::unique_lock lock(transport_mtx);
+
+ client_to_agent_stream_queue.clear();
+ client_to_agent_packet_queue.clear();
+ agent_to_client_stream_queue.clear();
+ agent_to_client_stream_queue.clear();
+
+ return true;
+};
+
+eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_packet = [](
+ eprosima::uxr::CustomEndPoint* source_endpoint,
+ uint8_t* buffer,
+ size_t buffer_length,
+ int timeout,
+ eprosima::uxr::TransportRc& transport_rc) -> ssize_t
+{
+ size_t rv = 0;
+ int64_t init_time = uxr_millis();
+ bool received = false;
+
+ transport_rc = eprosima::uxr::TransportRc::ok;
+
+ while (uxr_millis() - init_time < timeout)
+ {
+ std::unique_lock lock(transport_mtx);
+
+ int32_t index = find_queue_with_data(client_to_agent_packet_queue);
+ if (0 <= index)
+ {
+ auto data = client_to_agent_packet_queue[index].front();
+ client_to_agent_packet_queue[index].pop();
+
+ if (data.size() <= buffer_length)
+ {
+ std::copy(data.begin(), data.end(), buffer);
+ rv = data.size();
+ received = true;
+ std::cout << "Custom agent receive: " << rv << " bytes in queue " << index << std::endl;
+ source_endpoint->set_member_value("index", static_cast(index));
+ }
+ else
+ {
+ transport_rc = eprosima::uxr::TransportRc::server_error;
+ }
+
+ break;
+ }
+
+ lock.unlock();
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ }
+
+ if (!received)
+ {
+ transport_rc = eprosima::uxr::TransportRc::timeout_error;
+ }
+
+ return static_cast(rv);
+};
+
+eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_packet = [](
+ const eprosima::uxr::CustomEndPoint* destination_endpoint,
+ uint8_t* buffer,
+ size_t message_length,
+ eprosima::uxr::TransportRc& transport_rc) -> ssize_t
+{
+ std::unique_lock lock(transport_mtx);
+ int32_t index = static_cast(destination_endpoint->get_member("index"));
+
+ std::vector packet(buffer, buffer + message_length);
+ agent_to_client_packet_queue[index].emplace(std::move(packet));
+ transport_rc = eprosima::uxr::TransportRc::ok;
+ std::cout << "Custom agent send: " << message_length << " bytes." << std::endl;
+
+ return static_cast(message_length);
+};
+
+eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_stream = [](
+ eprosima::uxr::CustomEndPoint* source_endpoint,
+ uint8_t* buffer,
+ size_t buffer_length,
+ int timeout,
+ eprosima::uxr::TransportRc& transport_rc) -> ssize_t
+{
+ size_t rv = 0;
+ int64_t init_time = uxr_millis();
+ bool received = false;
+
+ transport_rc = eprosima::uxr::TransportRc::ok;
+
+ while (uxr_millis() - init_time < timeout)
+ {
+ std::unique_lock lock(transport_mtx);
+
+ int32_t index = find_queue_with_data(client_to_agent_stream_queue);
+ if (0 <= index)
+ {
+ rv = (buffer_length > client_to_agent_stream_queue[index].size()) ?
+ client_to_agent_stream_queue[index].size() :
+ buffer_length;
+
+ for (size_t i = 0; i < rv; i++)
+ {
+ buffer[i] = client_to_agent_stream_queue[index].front();
+ client_to_agent_stream_queue[index].pop();
+ }
+
+ std::cout << "Custom agent receive: " << rv << " bytes in queue " << index << std::endl;
+
+ source_endpoint->set_member_value("index", static_cast(index));
+ received = true;
+
+ break;
+ }
+
+ lock.unlock();
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ }
+
+ if (!received)
+ {
+ transport_rc = eprosima::uxr::TransportRc::timeout_error;
+ }
+
+ return static_cast(rv);
+};
+
+eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_stream = [](
+ const eprosima::uxr::CustomEndPoint* destination_endpoint,
+ uint8_t* buffer,
+ size_t message_length,
+ eprosima::uxr::TransportRc& transport_rc) -> ssize_t
+{
+ std::unique_lock lock(transport_mtx);
+ int32_t index = static_cast(destination_endpoint->get_member("index"));
+
+ for (size_t i = 0; i < message_length; i++)
+ {
+ agent_to_client_stream_queue[index].emplace(buffer[i]);
+ }
+
+ transport_rc = eprosima::uxr::TransportRc::ok;
+ std::cout << "Custom agent send: " << message_length << " bytes to queue " << index << std::endl;
+
+ return static_cast(message_length);
+};
+
+
+// Client custom transport
+extern "C"
+{
+ static int32_t global_index = 0;
+
+ bool client_custom_transport_open(uxrCustomTransport* transport)
+ {
+ transport->args = malloc(sizeof(int32_t));
+ *(int32_t*) transport->args = global_index++;
+ int32_t index = *(int32_t*) transport->args;
+
+ std::cout << "Custom client creating: " << index << std::endl;
+
+ return true;
+ }
+
+ bool client_custom_transport_close(uxrCustomTransport* transport)
+ {
+ int32_t index = *(int32_t*) transport->args;
+ free(transport->args);
+
+ std::unique_lock lock(transport_mtx);
+
+ erase_fifo_by_index(client_to_agent_packet_queue, index);
+ erase_fifo_by_index(client_to_agent_stream_queue, index);
+ erase_fifo_by_index(agent_to_client_packet_queue, index);
+ erase_fifo_by_index(agent_to_client_stream_queue, index);
+
+ return true;
+ }
+
+ size_t client_custom_transport_write_packet(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode)
+ {
+ (void) errcode;
+
+ int32_t index = *(int32_t*) transport->args;
+
+ std::unique_lock lock(transport_mtx);
+
+ std::vector packet(buf, buf + len);
+ client_to_agent_packet_queue[index].emplace(std::move(packet));
+ std::cout << "Custom client send: " << len << " bytes in queue " << index << std::endl;
+
+ return len;
+ }
+ size_t client_custom_transport_read_packet(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode)
+ {
+ (void) errcode;
+
+ int32_t index = *(int32_t*) transport->args;
+
+ size_t rv = 0;
+ int64_t init_time = uxr_millis();
+
+ while (uxr_millis() - init_time < timeout)
+ {
+ std::unique_lock lock(transport_mtx);
+
+ if (0 < agent_to_client_packet_queue[index].size())
+ {
+ auto data = agent_to_client_packet_queue[index].front();
+ agent_to_client_packet_queue[index].pop();
+
+ if (data.size() <= len)
+ {
+ std::copy( data.begin(), data.end(), buf);
+ rv = data.size();
+ std::cout << "Custom client receive: " << len << " bytes in queue " << index << std::endl;
+ }
+ else
+ {
+ *errcode = 1;
+ }
+
+ break;
+ }
+
+ lock.unlock();
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ }
+
+ return rv;
+ }
+
+ size_t client_custom_transport_write_stream(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode)
+ {
+ (void) errcode;
+
+ int32_t index = *(int32_t*) transport->args;
+
+ std::unique_lock lock(transport_mtx);
+
+ for (size_t i = 0; i < len; i++)
+ {
+ client_to_agent_stream_queue[index].emplace(buf[i]);
+ }
+
+ std::cout << "Custom client send: " << len << " bytes in queue " << index << std::endl;
+
+ return len;
+ }
+ size_t client_custom_transport_read_stream(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode)
+ {
+ (void) errcode;
+
+ int32_t index = *(int32_t*) transport->args;
+
+ size_t rv = 0;
+ int64_t init_time = uxr_millis();
+
+ while (uxr_millis() - init_time < timeout)
+ {
+ std::unique_lock lock(transport_mtx);
+
+ if (0 < agent_to_client_stream_queue[index].size())
+ {
+ rv = (len > agent_to_client_stream_queue[index].size()) ?
+ agent_to_client_stream_queue[index].size() :
+ len;
+
+ for (size_t i = 0; i < rv; i++)
+ {
+ buf[i] = agent_to_client_stream_queue[index].front();
+ agent_to_client_stream_queue[index].pop();
+ }
+
+ break;
+ }
+ lock.unlock();
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ }
+
+ return rv;
+ }
+}
diff --git a/test/test/custom_transports/Custom_transports.hpp b/test/test/custom_transports/Custom_transports.hpp
new file mode 100644
index 0000000..341c355
--- /dev/null
+++ b/test/test/custom_transports/Custom_transports.hpp
@@ -0,0 +1,26 @@
+#ifndef IN_TEST_CUSTOM_TRANSPORT_HPP
+#define IN_TEST_CUSTOM_TRANSPORT_HPP
+
+#include
+#include
+
+// Agent custom transports
+extern eprosima::uxr::CustomAgent::InitFunction agent_custom_transport_open;
+extern eprosima::uxr::CustomAgent::FiniFunction agent_custom_transport_close;
+extern eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_stream;
+extern eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_stream;
+extern eprosima::uxr::CustomAgent::RecvMsgFunction agent_custom_transport_read_packet;
+extern eprosima::uxr::CustomAgent::SendMsgFunction agent_custom_transport_write_packet;
+
+// Client custom transport
+extern "C"
+{
+ bool client_custom_transport_open(uxrCustomTransport* transport);
+ bool client_custom_transport_close(uxrCustomTransport* transport);
+ size_t client_custom_transport_write_stream( uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode);
+ size_t client_custom_transport_read_stream( uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode);
+ size_t client_custom_transport_write_packet( uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* errcode);
+ size_t client_custom_transport_read_packet( uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* errcode);
+}
+
+#endif //IN_TEST_CUSTOM_TRANSPORT_HPP
diff --git a/test/test/interaction_client/BigHelloWorld.h b/test/test/interaction_client/BigHelloWorld.h
index 2f9cdac..4cfbd90 100644
--- a/test/test/interaction_client/BigHelloWorld.h
+++ b/test/test/interaction_client/BigHelloWorld.h
@@ -37,7 +37,7 @@ extern "C"
typedef struct BigHelloWorld
{
uint32_t index;
- char message[4096];
+ char message[8192];
} BigHelloWorld;
diff --git a/test/test/interaction_client/CMakeLists.txt b/test/test/interaction_client/CMakeLists.txt
index e0d2aae..1514a7f 100644
--- a/test/test/interaction_client/CMakeLists.txt
+++ b/test/test/interaction_client/CMakeLists.txt
@@ -51,6 +51,7 @@ target_include_directories(interaction_client
target_link_libraries(interaction_client
PUBLIC
microxrcedds_client
+ custom_transports
PRIVATE
${GTEST_BOTH_LIBRARIES}
)
diff --git a/test/test/interaction_client/Client.hpp b/test/test/interaction_client/Client.hpp
index 6a431e3..a167b9d 100644
--- a/test/test/interaction_client/Client.hpp
+++ b/test/test/interaction_client/Client.hpp
@@ -4,19 +4,24 @@
#include "BigHelloWorld.h"
#include "Gateway.hpp"
#include
+#include <../custom_transports/Custom_transports.hpp>
#include
+#include
#include
#include
#include
#include
-enum class Transport {
+enum class Transport
+{
UDP_IPV4_TRANSPORT,
UDP_IPV6_TRANSPORT,
TCP_IPV4_TRANSPORT,
- TCP_IPV6_TRANSPORT
+ TCP_IPV6_TRANSPORT,
+ CUSTOM_WITH_FRAMING,
+ CUSTOM_WITHOUT_FRAMING
};
inline bool operator == (const uxrObjectId& obj1, const uxrObjectId& obj2)
@@ -33,6 +38,9 @@ inline bool operator == (const uxrStreamId& s1, const uxrStreamId& s2)
&& s1.direction == s2.direction;
}
+extern "C" bool flush_session(uxrSession* session){
+ return uxr_run_session_until_confirm_delivery(session, 1000);
+}
class Client
{
@@ -197,8 +205,14 @@ class Client
ucdrBuffer ub;
uint32_t topic_size = BigHelloWorld_size_of_topic(&topic, 0);
- bool prepared = uxr_prepare_output_stream(&session_, output_stream_id, datawriter_id, &ub, topic_size);
- ASSERT_TRUE(prepared);
+ uint16_t prepared = false;
+ if (topic_size < mtu_)
+ {
+ prepared = uxr_prepare_output_stream(&session_, output_stream_id, datawriter_id, &ub, topic_size);
+ } else {
+ prepared = uxr_prepare_output_stream_fragmented(&session_, output_stream_id, datawriter_id, &ub, topic_size, flush_session);
+ }
+ ASSERT_NE(prepared, UXR_INVALID_REQUEST_ID);
bool written = BigHelloWorld_serialize_topic(&ub, &topic);
ASSERT_TRUE(written);
ASSERT_FALSE(ub.error);
@@ -249,24 +263,52 @@ class Client
{
case Transport::UDP_IPV4_TRANSPORT:
mtu_ = UXR_CONFIG_UDP_TRANSPORT_MTU;
- ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, &udp_platform_, UXR_IPv4, ip, port));
+ ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, UXR_IPv4, ip, port));
uxr_init_session(&session_, gateway_.monitorize(&udp_transport_.comm), client_key_);
break;
case Transport::UDP_IPV6_TRANSPORT:
mtu_ = UXR_CONFIG_UDP_TRANSPORT_MTU;
- ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, &udp_platform_, UXR_IPv6, ip, port));
+ ASSERT_TRUE(uxr_init_udp_transport(&udp_transport_, UXR_IPv6, ip, port));
uxr_init_session(&session_, gateway_.monitorize(&udp_transport_.comm), client_key_);
break;
case Transport::TCP_IPV4_TRANSPORT:
mtu_ = UXR_CONFIG_TCP_TRANSPORT_MTU;
- ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, &tcp_platform_, UXR_IPv4, ip, port));
+ ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, UXR_IPv4, ip, port));
uxr_init_session(&session_, gateway_.monitorize(&tcp_transport_.comm), client_key_);
break;
case Transport::TCP_IPV6_TRANSPORT:
mtu_ = UXR_CONFIG_TCP_TRANSPORT_MTU;
- ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, &tcp_platform_, UXR_IPv6, ip, port));
+ ASSERT_TRUE(uxr_init_tcp_transport(&tcp_transport_, UXR_IPv6, ip, port));
uxr_init_session(&session_, gateway_.monitorize(&tcp_transport_.comm), client_key_);
break;
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ mtu_ = UXR_CONFIG_CUSTOM_TRANSPORT_MTU;
+
+ uxr_set_custom_transport_callbacks(
+ &custom_transport_,
+ false,
+ client_custom_transport_open,
+ client_custom_transport_close,
+ client_custom_transport_write_packet,
+ client_custom_transport_read_packet);
+
+ ASSERT_TRUE(uxr_init_custom_transport(&custom_transport_, NULL));
+ uxr_init_session(&session_, gateway_.monitorize(&custom_transport_.comm), client_key_);
+ break;
+ case Transport::CUSTOM_WITH_FRAMING:
+ mtu_ = UXR_CONFIG_CUSTOM_TRANSPORT_MTU;
+
+ uxr_set_custom_transport_callbacks(
+ &custom_transport_,
+ true,
+ client_custom_transport_open,
+ client_custom_transport_close,
+ client_custom_transport_write_stream,
+ client_custom_transport_read_stream);
+
+ ASSERT_TRUE(uxr_init_custom_transport(&custom_transport_, NULL));
+ uxr_init_session(&session_, gateway_.monitorize(&custom_transport_.comm), client_key_);
+ break;
}
init_common();
@@ -295,6 +337,10 @@ class Client
case Transport::TCP_IPV6_TRANSPORT:
ASSERT_TRUE(uxr_close_tcp_transport(&tcp_transport_));
break;
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ case Transport::CUSTOM_WITH_FRAMING:
+ ASSERT_TRUE(uxr_close_custom_transport(&custom_transport_));
+ break;
}
}
@@ -303,6 +349,35 @@ class Client
return mtu_;
}
+ void ping_agent(
+ const Transport transport_kind)
+ {
+ uxrCommunication* comm(nullptr);
+
+ switch (transport_kind)
+ {
+ case Transport::UDP_IPV4_TRANSPORT:
+ case Transport::UDP_IPV6_TRANSPORT:
+ {
+ comm = &udp_transport_.comm;
+ break;
+ }
+ case Transport::TCP_IPV4_TRANSPORT:
+ case Transport::TCP_IPV6_TRANSPORT:
+ {
+ comm = &tcp_transport_.comm;
+ break;
+ }
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ case Transport::CUSTOM_WITH_FRAMING:
+ {
+ comm = &custom_transport_.comm;
+ break;
+ }
+ }
+ ASSERT_TRUE(uxr_ping_agent_attempts(comm, 1000, 1));
+ }
+
private:
void init_common()
{
@@ -315,12 +390,15 @@ class Client
ASSERT_EQ(UXR_STATUS_OK, session_.info.last_requested_status);
/* Setup streams. */
- output_best_effort_stream_buffer_.reset(new uint8_t[mtu_ * UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS]{0});
- output_reliable_stream_buffer_.reset(new uint8_t[mtu_ * history_ * UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS]{0});
- input_reliable_stream_buffer_.reset(new uint8_t[mtu_ * history_ * UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS]{0});
+ output_best_effort_stream_buffer_.reset(
+ new std::vector(mtu_ * UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS, 0));
+ output_reliable_stream_buffer_.reset(
+ new std::vector(mtu_ * history_ * UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS, 0));
+ input_reliable_stream_buffer_.reset(
+ new std::vector(mtu_ * history_ * UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS, 0));
for(size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_BEST_EFFORT_STREAMS; ++i)
{
- uint8_t* buffer = output_best_effort_stream_buffer_.get() + mtu_ * i;
+ uint8_t* buffer = output_best_effort_stream_buffer_->data() + mtu_ * i;
(void) uxr_create_output_best_effort_stream(&session_, buffer, mtu_);
}
for(size_t i = 0; i < UXR_CONFIG_MAX_INPUT_BEST_EFFORT_STREAMS; ++i)
@@ -329,12 +407,12 @@ class Client
}
for(size_t i = 0; i < UXR_CONFIG_MAX_OUTPUT_RELIABLE_STREAMS; ++i)
{
- uint8_t* buffer = output_reliable_stream_buffer_.get() + mtu_ * history_ * i;
+ uint8_t* buffer = output_reliable_stream_buffer_->data() + mtu_ * history_ * i;
(void) uxr_create_output_reliable_stream(&session_, buffer , mtu_ * history_, history_);
}
for(size_t i = 0; i < UXR_CONFIG_MAX_INPUT_RELIABLE_STREAMS; ++i)
{
- uint8_t* buffer = input_reliable_stream_buffer_.get() + mtu_ * history_ * i;
+ uint8_t* buffer = input_reliable_stream_buffer_->data() + mtu_ * history_ * i;
(void) uxr_create_input_reliable_stream(&session_, buffer, mtu_ * history_, history_);
}
}
@@ -377,12 +455,6 @@ class Client
}
static uint32_t next_client_key_;
- static const char* participant_xml_;
- static const char* topic_xml_;
- static const char* publisher_xml_;
- static const char* subscriber_xml_;
- static const char* datawriter_xml_;
- static const char* datareader_xml_;
Gateway gateway_;
@@ -390,16 +462,15 @@ class Client
uint16_t history_;
uxrUDPTransport udp_transport_;
- uxrUDPPlatform udp_platform_;
uxrTCPTransport tcp_transport_;
- uxrTCPPlatform tcp_platform_;
+ uxrCustomTransport custom_transport_;
size_t mtu_;
uxrSession session_;
- std::unique_ptr output_best_effort_stream_buffer_;
- std::unique_ptr output_reliable_stream_buffer_;
- std::unique_ptr input_reliable_stream_buffer_;
+ std::shared_ptr> output_best_effort_stream_buffer_;
+ std::shared_ptr> output_reliable_stream_buffer_;
+ std::shared_ptr> input_reliable_stream_buffer_;
std::string expected_message_;
diff --git a/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp b/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp
index d17621d..67ee1dc 100644
--- a/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp
+++ b/test/test/publisher_subscriber/PublisherSubscriberInteraction.cpp
@@ -13,6 +13,9 @@
#include
#endif
+#include
+
+#include <../custom_transports/Custom_transports.hpp>
#include
@@ -52,11 +55,16 @@ class PublisherSubscriberNoLost : public ::testing::TestWithParam(GetParam()))
{
@@ -105,6 +113,36 @@ class PublisherSubscriberNoLost : public ::testing::TestWithParamset_verbose_level(6);
ASSERT_TRUE(agent_tcp6_->start());
break;
+ case Transport::CUSTOM_WITHOUT_FRAMING:
+ agent_custom_endpoint_.add_member("index");
+
+ agent_custom_.reset(new eprosima::uxr::CustomAgent(
+ "custom_agent",
+ &agent_custom_endpoint_,
+ middleware_,
+ false,
+ agent_custom_transport_open,
+ agent_custom_transport_close,
+ agent_custom_transport_write_packet,
+ agent_custom_transport_read_packet));
+ agent_custom_->set_verbose_level(6);
+ ASSERT_TRUE(agent_custom_->start());
+ break;
+ case Transport::CUSTOM_WITH_FRAMING:
+ agent_custom_endpoint_.add_member("index");
+
+ agent_custom_.reset(new eprosima::uxr::CustomAgent(
+ "custom_agent",
+ &agent_custom_endpoint_,
+ middleware_,
+ true,
+ agent_custom_transport_open,
+ agent_custom_transport_close,
+ agent_custom_transport_write_stream,
+ agent_custom_transport_read_stream));
+ agent_custom_->set_verbose_level(6);
+ ASSERT_TRUE(agent_custom_->start());
+ break;
}
}
@@ -123,6 +161,9 @@ class PublisherSubscriberNoLost : public ::testing::TestWithParam agent_udp6_;
std::unique_ptr agent_tcp4_;
std::unique_ptr agent_tcp6_;
+ std::unique_ptr agent_custom_;
+ eprosima::uxr::CustomEndPoint agent_custom_endpoint_;
+
eprosima::uxr::Middleware::Kind middleware_;
Client publisher_;
Client subscriber_;
@@ -157,6 +198,12 @@ TEST_P(PublisherSubscriberNoLost, PubSub10TopicsReliable)
check_messages(SMALL_MESSAGE, 10, 0x80);
}
+TEST_P(PublisherSubscriberNoLost, PubSub1ContinousFragmentedTopic)
+{
+ std::string message(size_t(publisher_.get_mtu() * 8), 'A');
+ publisher_.publish(1, 0x80, 1, message);
+}
+
// TODO (#4423) Fix the non-reliable behavior when messages is higher than the agent history to enable this
/*TEST_P(PublisherSubscriberNoLost, PubSub30TopicsReliable)
{
@@ -172,6 +219,14 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(MiddlewareKind::FASTDDS, MiddlewareKind::FASTRTPS, MiddlewareKind::CED),
::testing::Values(0.0f)));
+INSTANTIATE_TEST_CASE_P(
+ TransportAndLostCustomTransports,
+ PublisherSubscriberNoLost,
+ ::testing::Combine(
+ ::testing::Values(Transport::CUSTOM_WITH_FRAMING, Transport::CUSTOM_WITHOUT_FRAMING),
+ ::testing::Values(MiddlewareKind::FASTDDS),
+ ::testing::Values(0.0f)));
+
TEST_P(PublisherSubscriberLost, PubSub1FragmentedTopic2Parts)
{
std::string message(size_t(publisher_.get_mtu() * 1.5), 'A');