Skip to content

Commit

Permalink
new CSubscriber method "IsPublished" to be aligned with CPublishers "…
Browse files Browse the repository at this point in the history
…IsSubscribed" (we may rename both functions to "IsConnected" in the future)

new additional check for IsPublished using the connection state of the matching subscriber (should ensure that IsPublished is flagged as true only if publisher is able to send data)
test added to test new behavior
  • Loading branch information
rex-schilasky committed Jun 19, 2024
1 parent f16f46a commit 55bd00e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 4 deletions.
7 changes: 7 additions & 0 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ namespace eCAL
**/
ECAL_API bool IsCreated() const {return(m_created);}

/**
* @brief Query if the subscriber is published.
*
* @return true if published, false if not.
**/
ECAL_API bool IsPublished() const;

/**
* @brief Query the number of publishers.
*
Expand Down
14 changes: 11 additions & 3 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -261,8 +261,16 @@ namespace eCAL
{
iter->second->ApplyLayerParameter(publication_info, tlayer.type, tlayer.par_layer);
}
// inform for publisher connection
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
// we only inform the subscriber when the publisher has already recognized at least one subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
const bool local_publication = publication_info.host_name == Process::GetHostName();
const bool external_publication = !local_publication;
const bool local_confirmed = local_publication && (ecal_sample_.topic.connections_loc > 0);
const bool external_confirmed = external_publication && (ecal_sample_.topic.connections_ext > 0);
if(local_confirmed || external_confirmed)
{
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ namespace eCAL
return(m_datareader->RemEventCallback(type_));
}

bool CSubscriber::IsPublished() const
{
if (m_datareader == nullptr) return(false);
return(m_datareader->IsPublished());
}

size_t CSubscriber::GetPublisherCount() const
{
if (m_datareader == nullptr) return(0);
Expand Down
96 changes: 95 additions & 1 deletion ecal/tests/cpp/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -216,3 +216,97 @@ TEST(core_cpp_pubsub, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start(false);
std::atomic<bool> subscriber_seen_at_publication_end(false);

std::atomic<bool> do_start_publication(false);
std::atomic<bool> publication_finished(false);

// publishing thread
auto publisher_thread = [&]() {
eCAL::Publisher::Configuration pub_config;
pub_config.shm.acknowledge_timeout_ms = 500;
eCAL::CPublisher pub("blob", pub_config);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines)
{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
if (sub.IsPublished()) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}

0 comments on commit 55bd00e

Please sign in to comment.