Skip to content

Commit

Permalink
Bugfix: Publisher crashes when there are more than one publisher. [61…
Browse files Browse the repository at this point in the history
…79] (#659)

* Refs #6178. Written regression test.

* Refs #6178. Fixed bug.

* Refs #6178. Little change.
  • Loading branch information
richiware authored and raquelalvarezbanos committed Aug 6, 2019
1 parent daf8b3b commit 940f805
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 14 deletions.
12 changes: 7 additions & 5 deletions src/cpp/rtps/reader/RTPSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,29 +114,31 @@ void RTPSReader::add_persistence_guid(
const GUID_t& guid,
const GUID_t& persistence_guid)
{
GUID_t persistence_guid_to_store = (c_Guid_Unknown == persistence_guid) ? guid : persistence_guid;
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
history_state_->persistence_guid_map[guid] = persistence_guid;
history_state_->persistence_guid_count[persistence_guid]++;
history_state_->persistence_guid_map[guid] = persistence_guid_to_store;
history_state_->persistence_guid_count[persistence_guid_to_store]++;
}

void RTPSReader::remove_persistence_guid(
const GUID_t& guid,
const GUID_t& persistence_guid)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
GUID_t persistence_guid_stored = (c_Guid_Unknown == persistence_guid) ? guid : persistence_guid;
history_state_->persistence_guid_map.erase(guid);
auto count = --history_state_->persistence_guid_count[persistence_guid];
auto count = --history_state_->persistence_guid_count[persistence_guid_stored];
if (count == 0)
{
if (m_att.durabilityKind < TRANSIENT)
{
history_state_->history_record.erase(persistence_guid);
history_state_->history_record.erase(persistence_guid_stored);
}
}
}

SequenceNumber_t RTPSReader::update_last_notified(
const GUID_t& guid,
const GUID_t& guid,
const SequenceNumber_t& seq)
{
SequenceNumber_t ret_val;
Expand Down
38 changes: 38 additions & 0 deletions test/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND)
###############################################################################
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/simple_communication.py
${CMAKE_CURRENT_BINARY_DIR}/simple_communication.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/two_publishers_communication.py
${CMAKE_CURRENT_BINARY_DIR}/two_publishers_communication.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/liveliness_assertion.py
${CMAKE_CURRENT_BINARY_DIR}/liveliness_assertion.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/automatic_liveliness_assertion.py
Expand Down Expand Up @@ -194,5 +196,41 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND)
set_property(TEST AutomaticLivelinessAssertion APPEND PROPERTY ENVIRONMENT
"PATH=$<TARGET_FILE_DIR:${PROJECT_NAME}>\\;$<TARGET_FILE_DIR:fastcdr>\\;${WIN_PATH}")
endif()

add_test(NAME TwoPublishersCommunicationBestEffort
COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/two_publishers_communication.py)

# Set test with label NoMemoryCheck
set_property(TEST TwoPublishersCommunicationBestEffort PROPERTY LABELS "NoMemoryCheck")

set_property(TEST TwoPublishersCommunicationBestEffort PROPERTY ENVIRONMENT
"SIMPLE_COMMUNICATION_PUBLISHER_BIN=$<TARGET_FILE:SimpleCommunicationPublisher>")
set_property(TEST TwoPublishersCommunicationBestEffort APPEND PROPERTY ENVIRONMENT
"SIMPLE_COMMUNICATION_SUBSCRIBER_BIN=$<TARGET_FILE:SimpleCommunicationSubscriber>")
set_property(TEST TwoPublishersCommunicationBestEffort APPEND PROPERTY ENVIRONMENT
"XML_FILE=simple_besteffort.xml")
if(WIN32)
string(REPLACE ";" "\\;" WIN_PATH "$ENV{PATH}")
set_property(TEST TwoPublishersCommunicationBestEffort APPEND PROPERTY ENVIRONMENT
"PATH=$<TARGET_FILE_DIR:${PROJECT_NAME}>\\;$<TARGET_FILE_DIR:fastcdr>\\;${WIN_PATH}")
endif()

add_test(NAME TwoPublishersCommunicationReliable
COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/two_publishers_communication.py)

# Set test with label NoMemoryCheck
set_property(TEST TwoPublishersCommunicationReliable PROPERTY LABELS "NoMemoryCheck")

set_property(TEST TwoPublishersCommunicationReliable PROPERTY ENVIRONMENT
"SIMPLE_COMMUNICATION_PUBLISHER_BIN=$<TARGET_FILE:SimpleCommunicationPublisher>")
set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT
"SIMPLE_COMMUNICATION_SUBSCRIBER_BIN=$<TARGET_FILE:SimpleCommunicationSubscriber>")
set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT
"XML_FILE=simple_reliable.xml")
if(WIN32)
string(REPLACE ";" "\\;" WIN_PATH "$ENV{PATH}")
set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT
"PATH=$<TARGET_FILE_DIR:${PROJECT_NAME}>\\;$<TARGET_FILE_DIR:fastcdr>\\;${WIN_PATH}")
endif()
endif()
endif()
62 changes: 53 additions & 9 deletions test/communication/Subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <condition_variable>
#include <fstream>
#include <string>
#include <map>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
Expand Down Expand Up @@ -101,9 +102,10 @@ class SubListener : public SubscriberListener
{
public:

SubListener() : number_samples_(0) {}

~SubListener() {}
SubListener(const uint32_t max_number_samples)
: max_number_samples_(max_number_samples)
{
}

void onSubscriptionMatched(Subscriber* /*subscriber*/, MatchingInfo& info) override
{
Expand All @@ -127,10 +129,13 @@ class SubListener : public SubscriberListener
if(info.sampleKind == ALIVE)
{
std::unique_lock<std::mutex> lock(mutex_);
++number_samples_;
std::cout << "Received sample: index(" << sample.index() << "), message("
std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " <<
info.sample_identity.sequence_number() << "): index(" << sample.index() << "), message("
<< sample.message() << ")" << std::endl;
cv_.notify_all();
if(max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()])
{
cv_.notify_all();
}
}
}
}
Expand All @@ -153,7 +158,8 @@ class SubListener : public SubscriberListener

std::mutex mutex_;
std::condition_variable cv_;
unsigned int number_samples_;
const uint32_t max_number_samples_;
std::map<GUID_t, uint32_t> number_samples_;
};

int main(int argc, char** argv)
Expand All @@ -162,6 +168,7 @@ int main(int argc, char** argv)
bool notexit = false;
uint32_t seed = 7800;
uint32_t samples = 4;
uint32_t publishers = 1;
char* xml_file = nullptr;
std::string magic;

Expand Down Expand Up @@ -211,6 +218,16 @@ int main(int argc, char** argv)

xml_file = argv[arg_count];
}
else if(strcmp(argv[arg_count], "--publishers") == 0)
{
if(++arg_count >= argc)
{
std::cout << "--publishers expects a parameter" << std::endl;
return -1;
}

publishers = strtol(argv[arg_count], nullptr, 10);
}

++arg_count;
}
Expand All @@ -233,7 +250,7 @@ int main(int argc, char** argv)
HelloWorldType type;
Domain::registerType(participant, &type);

SubListener listener;
SubListener listener(samples);

// Generate topic name
std::ostringstream topic;
Expand Down Expand Up @@ -263,10 +280,37 @@ int main(int argc, char** argv)
if (run)
{
std::unique_lock<std::mutex> lock(listener.mutex_);
listener.cv_.wait(lock, [&]{ return listener.number_samples_ >= samples; });
listener.cv_.wait(lock, [&]
{
if(publishers < listener.number_samples_.size())
{
// Will fail later.
return true;
}
else if( publishers > listener.number_samples_.size())
{
return false;
}

for(auto& number_samples : listener.number_samples_)
{
if(samples > number_samples.second)
{
return false;
}
}

return true;
});
}

Domain::removeParticipant(participant);

if(publishers < listener.number_samples_.size())
{
std::cout << "ERROR: detected more than " << publishers << " publishers" << std::endl;
return -1;
}

return 0;
}
79 changes: 79 additions & 0 deletions test/communication/two_publishers_communication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys, os, subprocess, glob, time

script_dir = os.path.dirname(os.path.realpath(__file__))

publisher_command = os.environ.get("SIMPLE_COMMUNICATION_PUBLISHER_BIN")
if not publisher_command:
publisher_files = glob.glob(os.path.join(script_dir, "**/SimpleCommunicationPublisher*"), recursive=True)
pf = iter(publisher_files)
publisher_command = next(pf, None)
while publisher_command and (not os.path.isfile(publisher_command) or not os.access(publisher_command,
os.X_OK)):
publisher_command = next(pf, None)
assert publisher_command
subscriber_command = os.environ.get("SIMPLE_COMMUNICATION_SUBSCRIBER_BIN")
if not subscriber_command:
subscriber_files = glob.glob(os.path.join(script_dir, "**/SimpleCommunicationSubscriber*"), recursive=True)
pf = iter(subscriber_files)
subscriber_command = next(pf, None)
while subscriber_command and (not os.path.isfile(subscriber_command) or not os.access(subscriber_command,
os.X_OK)):
subscriber_command = next(pf, None)
assert subscriber_command

extra_pub_arg = os.environ.get("EXTRA_PUB_ARG")
if extra_pub_arg:
extra_pub_args = extra_pub_arg.split()
else:
extra_pub_args = []

real_xml_file_pub = None
real_xml_file_sub = None
xml_file = os.environ.get("XML_FILE")
if xml_file:
real_xml_file_pub = os.path.join(script_dir, xml_file)
real_xml_file_sub = os.path.join(script_dir, xml_file)
else:
xml_file_pub = os.environ.get("XML_FILE_PUB")
if xml_file_pub:
real_xml_file_pub = os.path.join(script_dir, xml_file_pub)
xml_file_sub = os.environ.get("XML_FILE_SUB")
if xml_file_sub:
real_xml_file_sub = os.path.join(script_dir, xml_file_sub)

subscriber1_proc = subprocess.Popen([subscriber_command, "--seed", str(os.getpid()), "--publishers", "2", "--samples", "10"]
+ (["--xmlfile", real_xml_file_sub] if real_xml_file_sub else []))
publisher1_proc = subprocess.Popen([publisher_command, "--seed", str(os.getpid()), "--samples", "10"]
+ (["--xmlfile", real_xml_file_pub] if real_xml_file_pub else [])
+ extra_pub_args)
time.sleep(1)
publisher2_proc = subprocess.Popen([publisher_command, "--seed", str(os.getpid()), "--samples", "10"]
+ (["--xmlfile", real_xml_file_pub] if real_xml_file_pub else [])
+ extra_pub_args)
subscriber2_proc = subprocess.Popen([subscriber_command, "--seed", str(os.getpid()), "--publishers", "2", "--samples", "10"]
+ (["--xmlfile", real_xml_file_sub] if real_xml_file_sub else []))

subscriber1_proc.communicate()
retvalue = subscriber1_proc.returncode
subscriber2_proc.communicate()
if retvalue == 0:
retvalue = subscriber2_proc.returncode

publisher1_proc.kill()
publisher2_proc.kill()

sys.exit(retvalue)

0 comments on commit 940f805

Please sign in to comment.