From e0a850adca0d1de6f59f1099d459e28b07f152ec Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 19 Jun 2024 18:08:04 +0200 Subject: [PATCH] new CSubscriber method "IsPublished" to be aligned with CPublishers "IsSubscribed" (we may rename both functions to "IsConnected" in the future) new additional check for IsPublished using the connection state of the matching subscriber (should ensure that IsPublished is flagged as true only if publisher is able to send data) test added to test new behavior --- ecal/core/include/ecal/ecal_subscriber.h | 9 +- ecal/core/src/pubsub/ecal_subgate.cpp | 18 +++- ecal/core/src/pubsub/ecal_subscriber.cpp | 8 +- ecal/core/src/readwrite/ecal_reader.h | 3 +- .../pubsub_test/src/pubsub_receive_test.cpp | 97 ++++++++++++++++++- 5 files changed, 124 insertions(+), 11 deletions(-) diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 9eff791399..96350e5695 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -298,6 +298,13 @@ namespace eCAL **/ ECAL_API bool IsCreated() const {return(m_created);} + /** + * @brief Query if the subscriber is published. + * + * @return true if published, false if not. + **/ + ECAL_API bool IsPublished() const; + /** * @brief Query the number of publishers. * diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 031cd0f93d..2d4cf66af3 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -295,8 +295,12 @@ namespace eCAL iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par); } - // inform for local publisher connection - iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least on local subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_loc() > 0) + { + iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + } } } @@ -345,8 +349,12 @@ namespace eCAL const std::string writer_par = tlayer.par_layer().SerializeAsString(); iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par); } - // inform for external publisher connection - iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least on external subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_ext() > 0) + { + iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + } } } diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index a5d92f9adc..dcd3b8f7f1 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -250,6 +250,12 @@ namespace eCAL return(m_datareader->RemEventCallback(type_)); } + bool CSubscriber::IsPublished() const + { + if (m_datareader == nullptr) return(false); + return(m_datareader->IsPublished()); + } + size_t CSubscriber::GetPublisherCount() const { if(m_datareader == nullptr) return(0); diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index ed84808463..61a973f55c 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,6 +96,7 @@ namespace eCAL bool IsCreated() const {return(m_created);} + bool IsPublished() const { return(m_loc_published || m_ext_published); } size_t GetPublisherCount() const { const std::lock_guard lock(m_pub_map_sync); diff --git a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp index d7059015a1..df5e9c0ef1 100644 --- a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -161,8 +161,6 @@ TEST(PubSub, TimingSubscriberReceive) EXPECT_EQ(0, eCAL::Finalize()); } - - // This tests test for sporadically received empty messages which were a problem. TEST(PubSub, SporadicEmptyReceives) { @@ -219,3 +217,96 @@ TEST(PubSub, SporadicEmptyReceives) // finalize eCAL API EXPECT_EQ(0, eCAL::Finalize()); } + +TEST(PubSub, TestSubscriberSeen) +{ + // initialize eCAL API + EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen")); + + // enable data loopback + eCAL::Util::EnableLoopback(true); + + std::atomic subscriber_seen_at_publication_start = false; + std::atomic subscriber_seen_at_publication_end = false; + + std::atomic do_start_publication = false; + std::atomic publication_finished = false; + + // publishing thread + auto publisher_thread = [&]() { + eCAL::CPublisher pub("blob"); + pub.ShmSetAcknowledgeTimeout(500); + + int cnt(0); + const auto max_runs(1000); + while (eCAL::Ok()) + { + if (do_start_publication && cnt < max_runs) + { + if (cnt == 0) + { + subscriber_seen_at_publication_start = pub.IsSubscribed(); + } + + pub.Send(std::to_string(cnt)); + cnt++; + + if (cnt == max_runs) + { + subscriber_seen_at_publication_end = pub.IsSubscribed(); + publication_finished = true; + break; + } + } + } + }; + + // subscribing thread + auto subscriber_thread = [&]() { + eCAL::CSubscriber sub("blob"); + bool received(false); + auto max_lines(10); + auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + if (max_lines) + { + // the final log should look like this + // ----------------------------------- + // Receiving 0 + // Receiving 1 + // Receiving 2 + // Receiving 3 + // Receiving 4 + // Receiving 5 + // Receiving 6 + // Receiving 7 + // Receiving 8 + // Receiving 9 + // ----------------------------------- + std::cout << "Receiving " << std::string(static_cast(data_->buf), data_->size) << std::endl; + max_lines--; + } + }; + sub.AddReceiveCallback(receive_lambda); + + while (eCAL::Ok() && !publication_finished) + { + if (sub.IsPublished()) do_start_publication = true; + } + }; + + // create threads for publisher and subscriber + std::thread pub_thread(publisher_thread); + std::thread sub_thread(subscriber_thread); + + // join threads to the main thread + pub_thread.join(); + sub_thread.join(); + + // finalize eCAL API + eCAL::Finalize(); + + // check if the publisher has seen the subscriber + EXPECT_TRUE(subscriber_seen_at_publication_start); + EXPECT_TRUE(subscriber_seen_at_publication_end); +}