From 35c1fd91cbd8fa95ce0b801977b7f0a9b935032f Mon Sep 17 00:00:00 2001 From: Rex Schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Tue, 25 Jun 2024 15:06:32 +0200 Subject: [PATCH] [core] new IsPublished API for CSubscriber, improved state logic for IsPublished and GetPublisherCount --- ecal/core/include/ecal/ecal_subscriber.h | 7 ++ ecal/core/src/pubsub/ecal_subgate.cpp | 14 ++- ecal/core/src/pubsub/ecal_subscriber.cpp | 6 ++ .../pubsub_test/src/pubsub_receive_test.cpp | 96 ++++++++++++++++++- 4 files changed, 119 insertions(+), 4 deletions(-) diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 15f9cc3d43..71b78e92c4 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -241,6 +241,13 @@ namespace eCAL **/ ECAL_API bool IsCreated() const {return(m_created);} + /** + * @brief Query if the subscriber is published. + * + * @return true if published, false if not. + **/ + ECAL_API bool IsPublished() const; + /** * @brief Query the number of publishers. * diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 44261b5e63..a303dd0848 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -261,8 +261,16 @@ namespace eCAL { iter->second->ApplyLayerParameter(publication_info, tlayer.type, tlayer.par_layer); } - // inform for publisher connection - iter->second->ApplyPublication(publication_info, topic_information, layer_states); + // we only inform the subscriber when the publisher has already recognized at least one subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + const bool local_publication = publication_info.host_name == Process::GetHostName(); + const bool external_publication = !local_publication; + const bool local_confirmed = local_publication && (ecal_sample_.topic.connections_loc > 0); + const bool external_confirmed = external_publication && (ecal_sample_.topic.connections_ext > 0); + if(local_confirmed || external_confirmed) + { + iter->second->ApplyPublication(publication_info, topic_information, layer_states); + } } } diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index 0086f32afa..c5aaeea32f 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -172,6 +172,12 @@ namespace eCAL return(m_datareader->RemEventCallback(type_)); } + bool CSubscriber::IsPublished() const + { + if (m_datareader == nullptr) return(false); + return(m_datareader->IsPublished()); + } + size_t CSubscriber::GetPublisherCount() const { if (m_datareader == nullptr) return(0); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_receive_test.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_receive_test.cpp index 4e18f043e6..e61dc53bb2 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_receive_test.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_receive_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -216,3 +216,97 @@ TEST(core_cpp_pubsub, SporadicEmptyReceives) // finalize eCAL API EXPECT_EQ(0, eCAL::Finalize()); } + +TEST(PubSub, TestSubscriberSeen) +{ + // initialize eCAL API + EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen")); + + // enable data loopback + eCAL::Util::EnableLoopback(true); + + std::atomic subscriber_seen_at_publication_start(false); + std::atomic subscriber_seen_at_publication_end(false); + + std::atomic do_start_publication(false); + std::atomic publication_finished(false); + + // publishing thread + auto publisher_thread = [&]() { + eCAL::Publisher::Configuration pub_config; + pub_config.shm.acknowledge_timeout_ms = 500; + eCAL::CPublisher pub("blob", pub_config); + + int cnt(0); + const auto max_runs(1000); + while (eCAL::Ok()) + { + if (do_start_publication && cnt < max_runs) + { + if (cnt == 0) + { + subscriber_seen_at_publication_start = pub.IsSubscribed(); + } + + pub.Send(std::to_string(cnt)); + cnt++; + + if (cnt == max_runs) + { + subscriber_seen_at_publication_end = pub.IsSubscribed(); + publication_finished = true; + break; + } + } + } + }; + + // subscribing thread + auto subscriber_thread = [&]() { + eCAL::CSubscriber sub("blob"); + bool received(false); + auto max_lines(10); + auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + if (max_lines) + { + // the final log should look like this + // ----------------------------------- + // Receiving 0 + // Receiving 1 + // Receiving 2 + // Receiving 3 + // Receiving 4 + // Receiving 5 + // Receiving 6 + // Receiving 7 + // Receiving 8 + // Receiving 9 + // ----------------------------------- + std::cout << "Receiving " << std::string(static_cast(data_->buf), data_->size) << std::endl; + max_lines--; + } + }; + sub.AddReceiveCallback(receive_lambda); + + while (eCAL::Ok() && !publication_finished) + { + if (sub.IsPublished()) do_start_publication = true; + } + }; + + // create threads for publisher and subscriber + std::thread pub_thread(publisher_thread); + std::thread sub_thread(subscriber_thread); + + // join threads to the main thread + pub_thread.join(); + sub_thread.join(); + + // finalize eCAL API + eCAL::Finalize(); + + // check if the publisher has seen the subscriber + EXPECT_TRUE(subscriber_seen_at_publication_start); + EXPECT_TRUE(subscriber_seen_at_publication_end); +}