Skip to content

Commit

Permalink
pubsub tests reclustered (shm, udp, tcp)
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky committed Apr 30, 2024
1 parent b88bd98 commit 3a18478
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 236 deletions.
2 changes: 1 addition & 1 deletion build/windows/05_build_pubsub_udp_only.bat
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ set CMAKE_OPTIONS=-DCMAKE_INSTALL_PREFIX=_install ^
-DECAL_CORE_BUILD_TESTS=ON ^
-DECAL_CORE_CONFIG_INIFILE=OFF ^
-DECAL_CORE_COMMAND_LINE=OFF ^
-DECAL_CORE_REGISTRATION=OFF ^
-DECAL_CORE_REGISTRATION=ON ^
-DECAL_CORE_REGISTRATION_SHM=OFF ^
-DECAL_CORE_MONITORING=OFF ^
-DECAL_CORE_PUBLISHER=ON ^
Expand Down
2 changes: 1 addition & 1 deletion build/windows/06_build_pubsub_tcp_only.bat
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ set CMAKE_OPTIONS=-DCMAKE_INSTALL_PREFIX=_install ^
-DECAL_CORE_BUILD_TESTS=ON ^
-DECAL_CORE_CONFIG_INIFILE=OFF ^
-DECAL_CORE_COMMAND_LINE=OFF ^
-DECAL_CORE_REGISTRATION=OFF ^
-DECAL_CORE_REGISTRATION=ON ^
-DECAL_CORE_REGISTRATION_SHM=OFF ^
-DECAL_CORE_MONITORING=OFF ^
-DECAL_CORE_PUBLISHER=ON ^
Expand Down
14 changes: 14 additions & 0 deletions ecal/tests/cpp/pubsub_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,25 @@ project(test_pubsub)
find_package(Threads REQUIRED)
find_package(GTest REQUIRED)

if(ECAL_CORE_TRANSPORT_SHM)
set(pubsub_test_src_shm
src/pubsub_test_shm.cpp
)
endif()

if(ECAL_CORE_TRANSPORT_UDP)
set(pubsub_test_src_udp
src/pubsub_test_udp.cpp
)
endif()

set(pubsub_test_src
src/pubsub_acknowledge.cpp
src/pubsub_multibuffer.cpp
src/pubsub_receive_test.cpp
src/pubsub_test.cpp
${pubsub_test_src_shm}
${pubsub_test_src_udp}
)

ecal_add_gtest(${PROJECT_NAME} ${pubsub_test_src})
Expand Down
2 changes: 1 addition & 1 deletion ecal/tests/cpp/pubsub_test/src/pubsub_acknowledge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace
}

// This test asserts that a timeouted acknowledge does not break subsequent calls
TEST(PubSub, TimeoutAcknowledgment)
TEST(core_cpp_pubsub, TimeoutAcknowledgment)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "TimeoutAcknowledgment", eCAL::Init::All));
Expand Down
2 changes: 1 addition & 1 deletion ecal/tests/cpp/pubsub_test/src/pubsub_multibuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ std::vector<char> multibuffer_pub_sub_test(int buffer_count, bool zero_copy, int
return received_content;
}

TEST(PubSub, MultibufferPubSub)
TEST(core_cpp_pubsub, MultibufferPubSub)
{
// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");
Expand Down
249 changes: 17 additions & 232 deletions ecal/tests/cpp/pubsub_test/src/pubsub_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,27 @@
#define DATA_FLOW_TIME 50
#define PAYLOAD_SIZE 1024

// subscriber callback function
std::atomic<size_t> g_callback_received_bytes;
std::atomic<size_t> g_callback_received_count;
void OnReceive(const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
namespace
{
g_callback_received_bytes += data_->size;
g_callback_received_count++;
}
// subscriber callback function
std::atomic<size_t> g_callback_received_bytes;
std::atomic<size_t> g_callback_received_count;
void OnReceive(const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
g_callback_received_bytes += data_->size;
g_callback_received_count++;
}

static std::string CreatePayLoad(size_t payload_size_)
{
std::string s = "Hello World ";
while(s.size() < payload_size_)
std::string CreatePayLoad(size_t payload_size_)
{
s += s;
std::string s = "Hello World ";
while(s.size() < payload_size_)
{
s += s;
}
s.resize(payload_size_);
return(s);
}
s.resize(payload_size_);
return(s);
}

TEST(core_cpp_pubsub, LeakedPubSub)
Expand Down Expand Up @@ -506,224 +509,6 @@ TEST(core_cpp_pubsub, DynamicCreate)
eCAL::Finalize();
}

TEST(core_cpp_pubsub, ZeroPayloadMessageSHM)
{
// default send string
const std::string send_s;

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::CSubscriber sub("A");

// create publisher config
eCAL::CPublisher::Config pub_config;
// set transport layer
pub_config.shm.send_mode = eCAL::TLayer::smode_on;
pub_config.udp.send_mode = eCAL::TLayer::smode_off;
pub_config.tcp.send_mode = eCAL::TLayer::smode_off;

// create publisher for topic "A"
eCAL::CPublisher pub("A", pub_config);

// add callback
EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2)));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);

g_callback_received_bytes = 0;
g_callback_received_count = 0;

EXPECT_EQ(send_s.size(), pub.Send(send_s));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

// check callback receive
EXPECT_EQ(send_s.size(), g_callback_received_bytes);
EXPECT_EQ(2, g_callback_received_count);

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}

TEST(core_cpp_pubsub, ZeroPayloadMessageUDP)
{
// default send string
std::string send_s;

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::CSubscriber sub("A");

// create publisher config
eCAL::CPublisher::Config pub_config;
// set transport layer
pub_config.shm.send_mode = eCAL::TLayer::smode_off;
pub_config.udp.send_mode = eCAL::TLayer::smode_on;
pub_config.tcp.send_mode = eCAL::TLayer::smode_off;

// create publisher for topic "A"
eCAL::CPublisher pub("A", pub_config);

// add callback
EXPECT_EQ(true, sub.AddReceiveCallback(std::bind(OnReceive, std::placeholders::_1, std::placeholders::_2)));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);

g_callback_received_bytes = 0;
g_callback_received_count = 0;

EXPECT_EQ(send_s.size(), pub.Send(send_s));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

EXPECT_EQ(send_s.size(), pub.Send(nullptr, 0));
eCAL::Process::SleepMS(DATA_FLOW_TIME);

// check callback receive
EXPECT_EQ(send_s.size(), g_callback_received_bytes);
EXPECT_EQ(2, g_callback_received_count);

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}

TEST(core_cpp_pubsub, MultipleSendsSHM)
{
// default send string
std::vector<std::string> send_vector{ "this", "is", "a", "", "testtest" };
std::string last_received_msg;
long long last_received_timestamp;

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::string::CSubscriber<std::string> sub("A");

// create publisher config
eCAL::CPublisher::Config pub_config;
// set transport layer
pub_config.shm.send_mode = eCAL::TLayer::smode_on;
pub_config.udp.send_mode = eCAL::TLayer::smode_off;
pub_config.tcp.send_mode = eCAL::TLayer::smode_off;

// create publisher for topic "A"
eCAL::string::CPublisher<std::string> pub("A", pub_config);

// add callback
auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/)
{
last_received_msg = msg_;
last_received_timestamp = time_;
};
EXPECT_TRUE(sub.AddReceiveCallback(save_data));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);
long long timestamp = 1;
for (const auto& elem : send_vector)
{
pub.Send(elem, timestamp);
eCAL::Process::SleepMS(DATA_FLOW_TIME);
EXPECT_EQ(last_received_msg, elem);
EXPECT_EQ(last_received_timestamp, timestamp);
++timestamp;
}

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}

TEST(core_cpp_pubsub, MultipleSendsUDP)
{
// default send string
const std::vector<std::string> send_vector{ "this", "is", "a", "", "testtest" };
std::string last_received_msg;
long long last_received_timestamp(0);

// initialize eCAL API
eCAL::Initialize(0, nullptr, "pubsub_test");

// publish / subscribe match in the same process
eCAL::Util::EnableLoopback(true);

// create subscriber for topic "A"
eCAL::string::CSubscriber<std::string> sub("A");

// create publisher config
eCAL::CPublisher::Config pub_config;
// set transport layer
pub_config.shm.send_mode = eCAL::TLayer::smode_off;
pub_config.udp.send_mode = eCAL::TLayer::smode_on;
pub_config.tcp.send_mode = eCAL::TLayer::smode_off;

// create publisher for topic "A"
eCAL::string::CPublisher<std::string> pub("A", pub_config);

// add callback
auto save_data = [&last_received_msg, &last_received_timestamp](const char* /*topic_name_*/, const std::string& msg_, long long time_, long long /*clock_*/, long long /*id_*/)
{
last_received_msg = msg_;
last_received_timestamp = time_;
};
EXPECT_TRUE(sub.AddReceiveCallback(save_data));

// let's match them
eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH);
long long timestamp = 1;
for (const auto& elem : send_vector)
{
pub.Send(elem, timestamp);
eCAL::Process::SleepMS(DATA_FLOW_TIME);
EXPECT_EQ(last_received_msg, elem);
EXPECT_EQ(last_received_timestamp, timestamp);
++timestamp;
}

// destroy subscriber
sub.Destroy();

// destroy publisher
pub.Destroy();

// finalize eCAL API
eCAL::Finalize();
}

TEST(core_cpp_pubsub, DestroyInCallback)
{
/* Test setup :
Expand Down

0 comments on commit 3a18478

Please sign in to comment.