diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 9eff791399..96350e5695 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -298,6 +298,13 @@ namespace eCAL **/ ECAL_API bool IsCreated() const {return(m_created);} + /** + * @brief Query if the subscriber is published. + * + * @return true if published, false if not. + **/ + ECAL_API bool IsPublished() const; + /** * @brief Query the number of publishers. * diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 031cd0f93d..2d4cf66af3 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -295,8 +295,12 @@ namespace eCAL iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par); } - // inform for local publisher connection - iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least on local subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_loc() > 0) + { + iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + } } } @@ -345,8 +349,12 @@ namespace eCAL const std::string writer_par = tlayer.par_layer().SerializeAsString(); iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par); } - // inform for external publisher connection - iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least on external subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_ext() > 0) + { + iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + } } } diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index a5d92f9adc..dcd3b8f7f1 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -250,6 +250,12 @@ namespace eCAL return(m_datareader->RemEventCallback(type_)); } + bool CSubscriber::IsPublished() const + { + if (m_datareader == nullptr) return(false); + return(m_datareader->IsPublished()); + } + size_t CSubscriber::GetPublisherCount() const { if(m_datareader == nullptr) return(0); diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index ed84808463..61a973f55c 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,6 +96,7 @@ namespace eCAL bool IsCreated() const {return(m_created);} + bool IsPublished() const { return(m_loc_published || m_ext_published); } size_t GetPublisherCount() const { const std::lock_guard lock(m_pub_map_sync); diff --git a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp index d7059015a1..df5e9c0ef1 100644 --- a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -161,8 +161,6 @@ TEST(PubSub, TimingSubscriberReceive) EXPECT_EQ(0, eCAL::Finalize()); } - - // This tests test for sporadically received empty messages which were a problem. TEST(PubSub, SporadicEmptyReceives) { @@ -219,3 +217,96 @@ TEST(PubSub, SporadicEmptyReceives) // finalize eCAL API EXPECT_EQ(0, eCAL::Finalize()); } + +TEST(PubSub, TestSubscriberSeen) +{ + // initialize eCAL API + EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen")); + + // enable data loopback + eCAL::Util::EnableLoopback(true); + + std::atomic subscriber_seen_at_publication_start = false; + std::atomic subscriber_seen_at_publication_end = false; + + std::atomic do_start_publication = false; + std::atomic publication_finished = false; + + // publishing thread + auto publisher_thread = [&]() { + eCAL::CPublisher pub("blob"); + pub.ShmSetAcknowledgeTimeout(500); + + int cnt(0); + const auto max_runs(1000); + while (eCAL::Ok()) + { + if (do_start_publication && cnt < max_runs) + { + if (cnt == 0) + { + subscriber_seen_at_publication_start = pub.IsSubscribed(); + } + + pub.Send(std::to_string(cnt)); + cnt++; + + if (cnt == max_runs) + { + subscriber_seen_at_publication_end = pub.IsSubscribed(); + publication_finished = true; + break; + } + } + } + }; + + // subscribing thread + auto subscriber_thread = [&]() { + eCAL::CSubscriber sub("blob"); + bool received(false); + auto max_lines(10); + auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + if (max_lines) + { + // the final log should look like this + // ----------------------------------- + // Receiving 0 + // Receiving 1 + // Receiving 2 + // Receiving 3 + // Receiving 4 + // Receiving 5 + // Receiving 6 + // Receiving 7 + // Receiving 8 + // Receiving 9 + // ----------------------------------- + std::cout << "Receiving " << std::string(static_cast(data_->buf), data_->size) << std::endl; + max_lines--; + } + }; + sub.AddReceiveCallback(receive_lambda); + + while (eCAL::Ok() && !publication_finished) + { + if (sub.IsPublished()) do_start_publication = true; + } + }; + + // create threads for publisher and subscriber + std::thread pub_thread(publisher_thread); + std::thread sub_thread(subscriber_thread); + + // join threads to the main thread + pub_thread.join(); + sub_thread.join(); + + // finalize eCAL API + eCAL::Finalize(); + + // check if the publisher has seen the subscriber + EXPECT_TRUE(subscriber_seen_at_publication_start); + EXPECT_TRUE(subscriber_seen_at_publication_end); +}