From 8e9e3844d26fd1646ef90acc066833dc8845fe90 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Tue, 17 Oct 2023 09:46:00 +0200 Subject: [PATCH] pub/sub reconnection gtest added --- testing/ecal/pubsub_test/src/pubsub_test.cpp | 79 ++++++++++++++++++-- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/testing/ecal/pubsub_test/src/pubsub_test.cpp b/testing/ecal/pubsub_test/src/pubsub_test.cpp index 214412cc92..5d908e4453 100644 --- a/testing/ecal/pubsub_test/src/pubsub_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_test.cpp @@ -869,10 +869,6 @@ TEST(IO, MultipleSendsUDP) eCAL::Finalize(); } - - - - #if 0 TEST(IO, ZeroPayloadMessageTCP) { @@ -923,8 +919,6 @@ TEST(IO, ZeroPayloadMessageTCP) } #endif -#include -#include TEST(IO, DestroyInCallback) { /* Test setup : @@ -988,4 +982,75 @@ TEST(IO, DestroyInCallback) // finalize eCAL API // without destroying any pub / sub eCAL::Finalize(); -} \ No newline at end of file +} + +TEST(IO, SubscriberReconnection) +{ + /* Test setup : + * publisher runs permanently in a thread + * subscriber start reading + * subscriber gets out of scope (destruction) + * subscriber starts again in a new scope + * Test ensures that subscriber is reconnecting and all sync mechanism are working properly again. + */ + + // initialize eCAL API + eCAL::Initialize(0, nullptr, "SubscriberReconnection"); + + // enable loop back communication in the same thread + eCAL::Util::EnableLoopback(true); + + // start publishing thread + std::atomic stop_publishing(false); + eCAL::string::CPublisher pub_foo("foo"); + std::thread pub_foo_t([&pub_foo, &stop_publishing]() { + while (!stop_publishing) + { + pub_foo.Send("Hello World"); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + std::cout << "Stopped publishing" << std::endl; + }); + + // scope 1 + { + size_t callback_received_count(0); + + eCAL::string::CSubscriber sub_foo("foo"); + auto receive_lambda = [&sub_foo, &callback_received_count](const char* /*topic_*/, const std::string& /*msg*/, long long /*time_*/, long long /*clock_*/, long long /*id_*/) { + std::cout << "Receiving in scope 1" << std::endl; + callback_received_count++; + }; + sub_foo.AddReceiveCallback(receive_lambda); + + // sleep for 2 seconds, we should receive something + std::this_thread::sleep_for(std::chrono::seconds(2)); + + EXPECT_TRUE(callback_received_count > 0); + } + + // scope 2 + { + size_t callback_received_count(0); + + eCAL::string::CSubscriber sub_foo("foo"); + auto receive_lambda = [&sub_foo, &callback_received_count](const char* /*topic_*/, const std::string& /*msg*/, long long /*time_*/, long long /*clock_*/, long long /*id_*/) { + std::cout << "Receiving in scope 2" << std::endl; + callback_received_count++; + }; + sub_foo.AddReceiveCallback(receive_lambda); + + // sleep for 2 seconds, we should receive something + std::this_thread::sleep_for(std::chrono::seconds(2)); + + EXPECT_TRUE(callback_received_count > 0); + } + + // stop publishing and join thread + stop_publishing = true; + pub_foo_t.join(); + + // finalize eCAL API + // without destroying any pub / sub + eCAL::Finalize(); +}