Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] add/improved IsPublished/IsSubscribed logic (master) #1633

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ namespace eCAL
**/
ECAL_API bool IsCreated() const {return(m_created);}

/**
* @brief Query if the subscriber is published.
*
* @return true if published, false if not.
**/
ECAL_API bool IsPublished() const;

/**
* @brief Query the number of publishers.
*
Expand Down
14 changes: 11 additions & 3 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -261,8 +261,16 @@ namespace eCAL
{
iter->second->ApplyLayerParameter(publication_info, tlayer.type, tlayer.par_layer);
}
// inform for publisher connection
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
// we only inform the subscriber when the publisher has already recognized at least one subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
const bool local_publication = publication_info.host_name == Process::GetHostName();
const bool external_publication = !local_publication;
const bool local_confirmed = local_publication && (ecal_sample_.topic.connections_loc > 0);
const bool external_confirmed = external_publication && (ecal_sample_.topic.connections_ext > 0);
if(local_confirmed || external_confirmed)
{
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ namespace eCAL
return(m_datareader->RemEventCallback(type_));
}

bool CSubscriber::IsPublished() const
{
if (m_datareader == nullptr) return(false);
return(m_datareader->IsPublished());
}

size_t CSubscriber::GetPublisherCount() const
{
if (m_datareader == nullptr) return(0);
Expand Down
96 changes: 95 additions & 1 deletion ecal/tests/cpp/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -216,3 +216,97 @@ TEST(core_cpp_pubsub, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: all parameters should be named in a function [readability-named-parameter]

Suggested change
TEST(PubSub, TestSubscriberSeen)
TEST(PubSub /*unused*/, TestSubscriberSeen /*unused*/)

{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start(false);
std::atomic<bool> subscriber_seen_at_publication_end(false);

std::atomic<bool> do_start_publication(false);
std::atomic<bool> publication_finished(false);

// publishing thread
auto publisher_thread = [&]() {
eCAL::Publisher::Configuration pub_config;
pub_config.shm.acknowledge_timeout_ms = 500;
eCAL::CPublisher pub("blob", pub_config);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: implicit conversion 'int' -> bool [readability-implicit-bool-conversion]

Suggested change
if (max_lines)
if (max_lines != 0)

{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
if (sub.IsPublished()) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}
Loading