From 668767066bc31024de55d805f21e77eae7cabc5f Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Mon, 23 Oct 2023 16:18:38 +0200 Subject: [PATCH] add 2 new event API functions gOpenNamedEvent and gOpenUnnamedEvent gOpenNamedEvent can handle ownership --- ecal/core/include/ecal/ecal_event.h | 23 ++++++ ecal/core/src/ecal_event.cpp | 73 ++++++++++++++++-- ecal/core/src/ecal_thread.cpp | 2 +- ecal/core/src/ecal_util.cpp | 2 +- ecal/core/src/ecalc.cpp | 2 +- ecal/core/src/io/ecal_memfile_pool.cpp | 4 +- ecal/core/src/io/ecal_memfile_sync.cpp | 6 +- ecal/core/src/pubsub/ecal_subgate.cpp | 2 +- samples/cpp/misc/event_rec/src/event_rec.cpp | 2 +- samples/cpp/misc/event_snd/src/event_snd.cpp | 2 +- testing/ecal/event_test/src/event_test.cpp | 9 ++- testing/ecal/pubsub_test/src/pubsub_test.cpp | 79 ++++++++++++++++++-- 12 files changed, 177 insertions(+), 29 deletions(-) diff --git a/ecal/core/include/ecal/ecal_event.h b/ecal/core/include/ecal/ecal_event.h index 5ce11a9018..fb5b1e690a 100644 --- a/ecal/core/include/ecal/ecal_event.h +++ b/ecal/core/include/ecal/ecal_event.h @@ -26,10 +26,32 @@ #include #include +#include + #include namespace eCAL { + /** + * @brief Open a named event with ownership. + * + * @param [out] event_ Returned event struct. + * @param event_name_ Event name. + * @param ownership_ Event is owned by the caller and will be destroyed on CloseEvent + * + * @return True if succeeded. + **/ + ECAL_API bool gOpenNamedEvent(eCAL::EventHandleT* event_, const std::string& event_name_, bool ownership_); + + /** + * @brief Open a named event with ownership. + * + * @param [out] event_ Returned event struct. + * + * @return True if succeeded. + **/ + ECAL_API bool gOpenUnnamedEvent(eCAL::EventHandleT* event_); + /** * @brief Open a named or unnamed event. * @@ -38,6 +60,7 @@ namespace eCAL * * @return True if succeeded. **/ + ECAL_DEPRECATE_SINCE_5_13("Use either gOpenNamedEvent or gOpenUnnamedEvent") ECAL_API bool gOpenEvent(eCAL::EventHandleT* event_, const std::string& event_name_ = ""); /** diff --git a/ecal/core/src/ecal_event.cpp b/ecal/core/src/ecal_event.cpp index e93ecb9344..9b81d2e122 100644 --- a/ecal/core/src/ecal_event.cpp +++ b/ecal/core/src/ecal_event.cpp @@ -18,7 +18,7 @@ */ /** - * @brief eCAL handle helper class - windows platform + * @brief eCAL handle helper class **/ #include @@ -35,12 +35,12 @@ #include "ecal_win_main.h" -namespace eCAL +namespace { - bool gOpenEvent(EventHandleT* event_, const std::string& event_name_) + bool OpenEvent(eCAL::EventHandleT* event_, const std::string& event_name_) { if(event_ == nullptr) return(false); - EventHandleT event; + eCAL::EventHandleT event; event.name = event_name_; event.handle = ::CreateEvent(nullptr, false, false, event_name_.c_str()); if(event.handle != nullptr) @@ -50,6 +50,25 @@ namespace eCAL } return(false); } +} + +namespace eCAL +{ + bool gOpenNamedEvent(eCAL::EventHandleT* event_, const std::string& event_name_, bool /*ownership_*/) + { + return OpenEvent(event_, event_name_); + } + + bool gOpenUnnamedEvent(eCAL::EventHandleT* event_) + { + return OpenEvent(event_, ""); + } + + // deprecated + bool gOpenEvent(EventHandleT* event_, const std::string& event_name_) + { + return OpenEvent(event_, event_name_); + } bool gCloseEvent(const EventHandleT& event_) { @@ -302,9 +321,10 @@ namespace eCAL class CNamedEvent { public: - explicit CNamedEvent(const std::string& name_) : + explicit CNamedEvent(const std::string& name_, bool ownership_) : m_name(name_ + "_evt"), - m_event(nullptr) + m_event(nullptr), + m_owner(ownership_) { m_name = (m_name[0] != '/') ? "/" + m_name : m_name; // make memory file path compatible for all posix systems m_event = named_event_open(m_name.c_str()); @@ -318,7 +338,10 @@ namespace eCAL { if(m_event == nullptr) return; named_event_close(m_event); - named_event_destroy(m_name.c_str()); + if(m_owner) + { + named_event_destroy(m_name.c_str()); + } } void set() @@ -371,8 +394,42 @@ namespace eCAL std::string m_name; named_event_t* m_event; + bool m_owner; }; + bool gOpenNamedEvent(EventHandleT* event_, const std::string& event_name_, bool ownership_) + { + if(event_ == nullptr) return(false); + + EventHandleT event; + event.name = event_name_; + event.handle = new CNamedEvent(event.name, ownership_); + + if(event.handle != nullptr) + { + *event_ = event; + return true; + } + return false; + } + + bool gOpenUnnamedEvent(EventHandleT* event_) + { + if(event_ == nullptr) return(false); + + EventHandleT event; + event.name = ""; + event.handle = new CEvent(); + + if(event.handle != nullptr) + { + *event_ = event; + return true; + } + return false; + } + + // deprecated bool gOpenEvent(EventHandleT* event_, const std::string& event_name_) { if(event_ == nullptr) return(false); @@ -386,7 +443,7 @@ namespace eCAL } else { - event.handle = new CNamedEvent(event.name); + event.handle = new CNamedEvent(event.name, true); } if(event.handle != nullptr) diff --git a/ecal/core/src/ecal_thread.cpp b/ecal/core/src/ecal_thread.cpp index e4bf8aed82..d118a53dbf 100644 --- a/ecal/core/src/ecal_thread.cpp +++ b/ecal/core/src/ecal_thread.cpp @@ -41,7 +41,7 @@ namespace eCAL { if(m_tdata.is_started) return(0); - gOpenEvent(&m_tdata.event); + gOpenUnnamedEvent(&m_tdata.event); m_tdata.do_stop = false; m_tdata.period = period_; m_tdata.ext_caller = ext_caller_; diff --git a/ecal/core/src/ecal_util.cpp b/ecal/core/src/ecal_util.cpp index df8806be9e..4590ecc939 100644 --- a/ecal/core/src/ecal_util.cpp +++ b/ecal/core/src/ecal_util.cpp @@ -91,7 +91,7 @@ namespace eCAL { const std::string event_name = EVENT_SHUTDOWN_PROC + std::string("_") + std::to_string(process_id_); EventHandleT event; - if (gOpenEvent(&event, event_name)) + if (gOpenNamedEvent(&event, event_name, true)) { std::cout << "Shutdown local eCAL process " << process_id_ << std::endl; gSetEvent(event); diff --git a/ecal/core/src/ecalc.cpp b/ecal/core/src/ecalc.cpp index 8cd8492b68..c8f68be5e8 100644 --- a/ecal/core/src/ecalc.cpp +++ b/ecal/core/src/ecalc.cpp @@ -469,7 +469,7 @@ extern "C" ECALC_API ECAL_HANDLE eCAL_Event_gOpenEvent(const char* event_name_) { eCAL::EventHandleT* event_handle = new eCAL::EventHandleT; - const bool success = eCAL::gOpenEvent(event_handle, event_name_); + const bool success = eCAL::gOpenNamedEvent(event_handle, event_name_, true); if (success) { return(event_handle); diff --git a/ecal/core/src/io/ecal_memfile_pool.cpp b/ecal/core/src/io/ecal_memfile_pool.cpp index a028b5780b..eb274c859c 100644 --- a/ecal/core/src/io/ecal_memfile_pool.cpp +++ b/ecal/core/src/io/ecal_memfile_pool.cpp @@ -54,8 +54,8 @@ namespace eCAL if (m_created) return false; // open memory file events - gOpenEvent(&m_event_snd, memfile_event_); - gOpenEvent(&m_event_ack, memfile_event_ + "_ack"); + gOpenNamedEvent(&m_event_snd, memfile_event_, false); + gOpenNamedEvent(&m_event_ack, memfile_event_ + "_ack", false); // create memory file access m_memfile.Create(memfile_name_.c_str(), false); diff --git a/ecal/core/src/io/ecal_memfile_sync.cpp b/ecal/core/src/io/ecal_memfile_sync.cpp index ee22d8a363..d32649c4d1 100644 --- a/ecal/core/src/io/ecal_memfile_sync.cpp +++ b/ecal/core/src/io/ecal_memfile_sync.cpp @@ -65,8 +65,8 @@ namespace eCAL if (iter == m_event_handle_map.end()) { SEventHandlePair event_pair; - gOpenEvent(&event_pair.event_snd, event_snd_name); - gOpenEvent(&event_pair.event_ack, event_ack_name); + gOpenNamedEvent(&event_pair.event_snd, event_snd_name, true); + gOpenNamedEvent(&event_pair.event_ack, event_ack_name, true); m_event_handle_map.insert(std::pair(process_id_, event_pair)); return true; } @@ -77,7 +77,7 @@ namespace eCAL // event was deactivated by a sync timeout in SendSyncEvents if (!gEventIsValid(iter->second.event_ack)) { - gOpenEvent(&iter->second.event_ack, event_ack_name); + gOpenNamedEvent(&iter->second.event_ack, event_ack_name, true); } // Set the ack event to valid again, so we will wait for the subscriber diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index de43f46526..1959a7dbf3 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -47,7 +47,7 @@ namespace eCAL static const std::string event_name(EVENT_SHUTDOWN_PROC + std::string("_") + std::to_string(Process::GetProcessID())); if (!gEventIsValid(evt)) { - gOpenEvent(&evt, event_name); + gOpenNamedEvent(&evt, event_name, true); } return(evt); } diff --git a/samples/cpp/misc/event_rec/src/event_rec.cpp b/samples/cpp/misc/event_rec/src/event_rec.cpp index 435c0f67a2..ac6b894ec4 100644 --- a/samples/cpp/misc/event_rec/src/event_rec.cpp +++ b/samples/cpp/misc/event_rec/src/event_rec.cpp @@ -30,7 +30,7 @@ int main(int /*argc*/, char** /*argv*/) // create named event eCAL::EventHandleT event_handle; - eCAL::gOpenEvent(&event_handle, event_name); + eCAL::gOpenNamedEvent(&event_handle, event_name, false); // timer auto start_time(std::chrono::steady_clock::now()); diff --git a/samples/cpp/misc/event_snd/src/event_snd.cpp b/samples/cpp/misc/event_snd/src/event_snd.cpp index 6825b9cdd8..6835fd2c0c 100644 --- a/samples/cpp/misc/event_snd/src/event_snd.cpp +++ b/samples/cpp/misc/event_snd/src/event_snd.cpp @@ -30,7 +30,7 @@ int main(int /*argc*/, char** /*argv*/) // create named event eCAL::EventHandleT event_handle; - eCAL::gOpenEvent(&event_handle, event_name); + eCAL::gOpenNamedEvent(&event_handle, event_name, true); // timer auto start_time(std::chrono::steady_clock::now()); diff --git a/testing/ecal/event_test/src/event_test.cpp b/testing/ecal/event_test/src/event_test.cpp index 6e02acc94f..b62cf8d836 100644 --- a/testing/ecal/event_test/src/event_test.cpp +++ b/testing/ecal/event_test/src/event_test.cpp @@ -28,14 +28,17 @@ TEST(Event, EventSetGet) // create named event eCAL::EventHandleT event_handle; - EXPECT_EQ(true, eCAL::gOpenEvent(&event_handle, event_name)); + EXPECT_EQ(true, eCAL::gOpenNamedEvent(&event_handle, event_name, true)); - // get none set event + // wait for event, expect false EXPECT_EQ(false, gWaitForEvent(event_handle, 10)); // set event EXPECT_EQ(true, gSetEvent(event_handle)); - // get set event + // wait for event, expect true now EXPECT_EQ(true, gWaitForEvent(event_handle, 100)); + + // close (and unlink/destroy) event + EXPECT_EQ(true, eCAL::gCloseEvent(event_handle)); } diff --git a/testing/ecal/pubsub_test/src/pubsub_test.cpp b/testing/ecal/pubsub_test/src/pubsub_test.cpp index 214412cc92..5d908e4453 100644 --- a/testing/ecal/pubsub_test/src/pubsub_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_test.cpp @@ -869,10 +869,6 @@ TEST(IO, MultipleSendsUDP) eCAL::Finalize(); } - - - - #if 0 TEST(IO, ZeroPayloadMessageTCP) { @@ -923,8 +919,6 @@ TEST(IO, ZeroPayloadMessageTCP) } #endif -#include -#include TEST(IO, DestroyInCallback) { /* Test setup : @@ -988,4 +982,75 @@ TEST(IO, DestroyInCallback) // finalize eCAL API // without destroying any pub / sub eCAL::Finalize(); -} \ No newline at end of file +} + +TEST(IO, SubscriberReconnection) +{ + /* Test setup : + * publisher runs permanently in a thread + * subscriber start reading + * subscriber gets out of scope (destruction) + * subscriber starts again in a new scope + * Test ensures that subscriber is reconnecting and all sync mechanism are working properly again. + */ + + // initialize eCAL API + eCAL::Initialize(0, nullptr, "SubscriberReconnection"); + + // enable loop back communication in the same thread + eCAL::Util::EnableLoopback(true); + + // start publishing thread + std::atomic stop_publishing(false); + eCAL::string::CPublisher pub_foo("foo"); + std::thread pub_foo_t([&pub_foo, &stop_publishing]() { + while (!stop_publishing) + { + pub_foo.Send("Hello World"); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + std::cout << "Stopped publishing" << std::endl; + }); + + // scope 1 + { + size_t callback_received_count(0); + + eCAL::string::CSubscriber sub_foo("foo"); + auto receive_lambda = [&sub_foo, &callback_received_count](const char* /*topic_*/, const std::string& /*msg*/, long long /*time_*/, long long /*clock_*/, long long /*id_*/) { + std::cout << "Receiving in scope 1" << std::endl; + callback_received_count++; + }; + sub_foo.AddReceiveCallback(receive_lambda); + + // sleep for 2 seconds, we should receive something + std::this_thread::sleep_for(std::chrono::seconds(2)); + + EXPECT_TRUE(callback_received_count > 0); + } + + // scope 2 + { + size_t callback_received_count(0); + + eCAL::string::CSubscriber sub_foo("foo"); + auto receive_lambda = [&sub_foo, &callback_received_count](const char* /*topic_*/, const std::string& /*msg*/, long long /*time_*/, long long /*clock_*/, long long /*id_*/) { + std::cout << "Receiving in scope 2" << std::endl; + callback_received_count++; + }; + sub_foo.AddReceiveCallback(receive_lambda); + + // sleep for 2 seconds, we should receive something + std::this_thread::sleep_for(std::chrono::seconds(2)); + + EXPECT_TRUE(callback_received_count > 0); + } + + // stop publishing and join thread + stop_publishing = true; + pub_foo_t.join(); + + // finalize eCAL API + // without destroying any pub / sub + eCAL::Finalize(); +}