From 940f805464d8abdab73cb49839a4d8b9c864a587 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez?= Date: Tue, 6 Aug 2019 11:28:48 +0200 Subject: [PATCH] Bugfix: Publisher crashes when there are more than one publisher. [6179] (#659) * Refs #6178. Written regression test. * Refs #6178. Fixed bug. * Refs #6178. Little change. --- src/cpp/rtps/reader/RTPSReader.cpp | 12 +-- test/communication/CMakeLists.txt | 38 +++++++++ test/communication/Subscriber.cpp | 62 ++++++++++++--- .../two_publishers_communication.py | 79 +++++++++++++++++++ 4 files changed, 177 insertions(+), 14 deletions(-) create mode 100644 test/communication/two_publishers_communication.py diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index 88f3b6b50a8..df920773776 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -114,9 +114,10 @@ void RTPSReader::add_persistence_guid( const GUID_t& guid, const GUID_t& persistence_guid) { + GUID_t persistence_guid_to_store = (c_Guid_Unknown == persistence_guid) ? guid : persistence_guid; std::lock_guard guard(mp_mutex); - history_state_->persistence_guid_map[guid] = persistence_guid; - history_state_->persistence_guid_count[persistence_guid]++; + history_state_->persistence_guid_map[guid] = persistence_guid_to_store; + history_state_->persistence_guid_count[persistence_guid_to_store]++; } void RTPSReader::remove_persistence_guid( @@ -124,19 +125,20 @@ void RTPSReader::remove_persistence_guid( const GUID_t& persistence_guid) { std::lock_guard guard(mp_mutex); + GUID_t persistence_guid_stored = (c_Guid_Unknown == persistence_guid) ? guid : persistence_guid; history_state_->persistence_guid_map.erase(guid); - auto count = --history_state_->persistence_guid_count[persistence_guid]; + auto count = --history_state_->persistence_guid_count[persistence_guid_stored]; if (count == 0) { if (m_att.durabilityKind < TRANSIENT) { - history_state_->history_record.erase(persistence_guid); + history_state_->history_record.erase(persistence_guid_stored); } } } SequenceNumber_t RTPSReader::update_last_notified( - const GUID_t& guid, + const GUID_t& guid, const SequenceNumber_t& seq) { SequenceNumber_t ret_val; diff --git a/test/communication/CMakeLists.txt b/test/communication/CMakeLists.txt index 1dc03b3bfca..00d2a95be3e 100644 --- a/test/communication/CMakeLists.txt +++ b/test/communication/CMakeLists.txt @@ -48,6 +48,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) ############################################################################### configure_file(${CMAKE_CURRENT_SOURCE_DIR}/simple_communication.py ${CMAKE_CURRENT_BINARY_DIR}/simple_communication.py COPYONLY) + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/two_publishers_communication.py + ${CMAKE_CURRENT_BINARY_DIR}/two_publishers_communication.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/liveliness_assertion.py ${CMAKE_CURRENT_BINARY_DIR}/liveliness_assertion.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/automatic_liveliness_assertion.py @@ -194,5 +196,41 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) set_property(TEST AutomaticLivelinessAssertion APPEND PROPERTY ENVIRONMENT "PATH=$\\;$\\;${WIN_PATH}") endif() + + add_test(NAME TwoPublishersCommunicationBestEffort + COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/two_publishers_communication.py) + + # Set test with label NoMemoryCheck + set_property(TEST TwoPublishersCommunicationBestEffort PROPERTY LABELS "NoMemoryCheck") + + set_property(TEST TwoPublishersCommunicationBestEffort PROPERTY ENVIRONMENT + "SIMPLE_COMMUNICATION_PUBLISHER_BIN=$") + set_property(TEST TwoPublishersCommunicationBestEffort APPEND PROPERTY ENVIRONMENT + "SIMPLE_COMMUNICATION_SUBSCRIBER_BIN=$") + set_property(TEST TwoPublishersCommunicationBestEffort APPEND PROPERTY ENVIRONMENT + "XML_FILE=simple_besteffort.xml") + if(WIN32) + string(REPLACE ";" "\\;" WIN_PATH "$ENV{PATH}") + set_property(TEST TwoPublishersCommunicationBestEffort APPEND PROPERTY ENVIRONMENT + "PATH=$\\;$\\;${WIN_PATH}") + endif() + + add_test(NAME TwoPublishersCommunicationReliable + COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/two_publishers_communication.py) + + # Set test with label NoMemoryCheck + set_property(TEST TwoPublishersCommunicationReliable PROPERTY LABELS "NoMemoryCheck") + + set_property(TEST TwoPublishersCommunicationReliable PROPERTY ENVIRONMENT + "SIMPLE_COMMUNICATION_PUBLISHER_BIN=$") + set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT + "SIMPLE_COMMUNICATION_SUBSCRIBER_BIN=$") + set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT + "XML_FILE=simple_reliable.xml") + if(WIN32) + string(REPLACE ";" "\\;" WIN_PATH "$ENV{PATH}") + set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT + "PATH=$\\;$\\;${WIN_PATH}") + endif() endif() endif() diff --git a/test/communication/Subscriber.cpp b/test/communication/Subscriber.cpp index 379845b8a00..b77cb0634ec 100644 --- a/test/communication/Subscriber.cpp +++ b/test/communication/Subscriber.cpp @@ -35,6 +35,7 @@ #include #include #include +#include using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -101,9 +102,10 @@ class SubListener : public SubscriberListener { public: - SubListener() : number_samples_(0) {} - - ~SubListener() {} + SubListener(const uint32_t max_number_samples) + : max_number_samples_(max_number_samples) + { + } void onSubscriptionMatched(Subscriber* /*subscriber*/, MatchingInfo& info) override { @@ -127,10 +129,13 @@ class SubListener : public SubscriberListener if(info.sampleKind == ALIVE) { std::unique_lock lock(mutex_); - ++number_samples_; - std::cout << "Received sample: index(" << sample.index() << "), message(" + std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " << + info.sample_identity.sequence_number() << "): index(" << sample.index() << "), message(" << sample.message() << ")" << std::endl; - cv_.notify_all(); + if(max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()]) + { + cv_.notify_all(); + } } } } @@ -153,7 +158,8 @@ class SubListener : public SubscriberListener std::mutex mutex_; std::condition_variable cv_; - unsigned int number_samples_; + const uint32_t max_number_samples_; + std::map number_samples_; }; int main(int argc, char** argv) @@ -162,6 +168,7 @@ int main(int argc, char** argv) bool notexit = false; uint32_t seed = 7800; uint32_t samples = 4; + uint32_t publishers = 1; char* xml_file = nullptr; std::string magic; @@ -211,6 +218,16 @@ int main(int argc, char** argv) xml_file = argv[arg_count]; } + else if(strcmp(argv[arg_count], "--publishers") == 0) + { + if(++arg_count >= argc) + { + std::cout << "--publishers expects a parameter" << std::endl; + return -1; + } + + publishers = strtol(argv[arg_count], nullptr, 10); + } ++arg_count; } @@ -233,7 +250,7 @@ int main(int argc, char** argv) HelloWorldType type; Domain::registerType(participant, &type); - SubListener listener; + SubListener listener(samples); // Generate topic name std::ostringstream topic; @@ -263,10 +280,37 @@ int main(int argc, char** argv) if (run) { std::unique_lock lock(listener.mutex_); - listener.cv_.wait(lock, [&]{ return listener.number_samples_ >= samples; }); + listener.cv_.wait(lock, [&] + { + if(publishers < listener.number_samples_.size()) + { + // Will fail later. + return true; + } + else if( publishers > listener.number_samples_.size()) + { + return false; + } + + for(auto& number_samples : listener.number_samples_) + { + if(samples > number_samples.second) + { + return false; + } + } + + return true; + }); } Domain::removeParticipant(participant); + if(publishers < listener.number_samples_.size()) + { + std::cout << "ERROR: detected more than " << publishers << " publishers" << std::endl; + return -1; + } + return 0; } diff --git a/test/communication/two_publishers_communication.py b/test/communication/two_publishers_communication.py new file mode 100644 index 00000000000..3636cf2fe75 --- /dev/null +++ b/test/communication/two_publishers_communication.py @@ -0,0 +1,79 @@ +# Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys, os, subprocess, glob, time + +script_dir = os.path.dirname(os.path.realpath(__file__)) + +publisher_command = os.environ.get("SIMPLE_COMMUNICATION_PUBLISHER_BIN") +if not publisher_command: + publisher_files = glob.glob(os.path.join(script_dir, "**/SimpleCommunicationPublisher*"), recursive=True) + pf = iter(publisher_files) + publisher_command = next(pf, None) + while publisher_command and (not os.path.isfile(publisher_command) or not os.access(publisher_command, + os.X_OK)): + publisher_command = next(pf, None) +assert publisher_command +subscriber_command = os.environ.get("SIMPLE_COMMUNICATION_SUBSCRIBER_BIN") +if not subscriber_command: + subscriber_files = glob.glob(os.path.join(script_dir, "**/SimpleCommunicationSubscriber*"), recursive=True) + pf = iter(subscriber_files) + subscriber_command = next(pf, None) + while subscriber_command and (not os.path.isfile(subscriber_command) or not os.access(subscriber_command, + os.X_OK)): + subscriber_command = next(pf, None) +assert subscriber_command + +extra_pub_arg = os.environ.get("EXTRA_PUB_ARG") +if extra_pub_arg: + extra_pub_args = extra_pub_arg.split() +else: + extra_pub_args = [] + +real_xml_file_pub = None +real_xml_file_sub = None +xml_file = os.environ.get("XML_FILE") +if xml_file: + real_xml_file_pub = os.path.join(script_dir, xml_file) + real_xml_file_sub = os.path.join(script_dir, xml_file) +else: + xml_file_pub = os.environ.get("XML_FILE_PUB") + if xml_file_pub: + real_xml_file_pub = os.path.join(script_dir, xml_file_pub) + xml_file_sub = os.environ.get("XML_FILE_SUB") + if xml_file_sub: + real_xml_file_sub = os.path.join(script_dir, xml_file_sub) + +subscriber1_proc = subprocess.Popen([subscriber_command, "--seed", str(os.getpid()), "--publishers", "2", "--samples", "10"] + + (["--xmlfile", real_xml_file_sub] if real_xml_file_sub else [])) +publisher1_proc = subprocess.Popen([publisher_command, "--seed", str(os.getpid()), "--samples", "10"] + + (["--xmlfile", real_xml_file_pub] if real_xml_file_pub else []) + + extra_pub_args) +time.sleep(1) +publisher2_proc = subprocess.Popen([publisher_command, "--seed", str(os.getpid()), "--samples", "10"] + + (["--xmlfile", real_xml_file_pub] if real_xml_file_pub else []) + + extra_pub_args) +subscriber2_proc = subprocess.Popen([subscriber_command, "--seed", str(os.getpid()), "--publishers", "2", "--samples", "10"] + + (["--xmlfile", real_xml_file_sub] if real_xml_file_sub else [])) + +subscriber1_proc.communicate() +retvalue = subscriber1_proc.returncode +subscriber2_proc.communicate() +if retvalue == 0: + retvalue = subscriber2_proc.returncode + +publisher1_proc.kill() +publisher2_proc.kill() + +sys.exit(retvalue)