Skip to content

Commit

Permalink
[core] redesign subscriber api (#1863)
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky authored Dec 17, 2024
1 parent 315df75 commit 7a1d931
Show file tree
Hide file tree
Showing 59 changed files with 1,764 additions and 1,086 deletions.
8 changes: 4 additions & 4 deletions app/mon/mon_plugins/raw_data_reflection/src/plugin_widget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString&, QWidget* p
ui_.content_layout->addWidget(frame);

// Connect the eCAL Subscriber
subscriber_.AddReceiveCallback([this](const eCAL::Registration::STopicId& /*topic_id*/,
subscriber_.SetReceiveCallback([this](const eCAL::Registration::STopicId& /*topic_id*/,
const eCAL::SDataTypeInformation& /*data_type_info*/,
const eCAL::SReceiveCallbackData& callback_data)
{
Expand All @@ -69,7 +69,7 @@ PluginWidget::PluginWidget(const QString& topic_name, const QString&, QWidget* p

PluginWidget::~PluginWidget()
{
subscriber_.RemReceiveCallback();
subscriber_.RemoveReceiveCallback();
}

void PluginWidget::ecalMessageReceivedCallback(const eCAL::SReceiveCallbackData& callback_data)
Expand Down Expand Up @@ -134,7 +134,7 @@ void PluginWidget::onUpdate()
void PluginWidget::onResume()
{
// (Re)Connect the eCAL Subscriber
subscriber_.AddReceiveCallback([this](const eCAL::Registration::STopicId& /*topic_id*/,
subscriber_.SetReceiveCallback([this](const eCAL::Registration::STopicId& /*topic_id*/,
const eCAL::SDataTypeInformation& /*data_type_info*/,
const eCAL::SReceiveCallbackData& callback_data)
{
Expand All @@ -144,7 +144,7 @@ void PluginWidget::onResume()

void PluginWidget::onPause()
{
subscriber_.RemReceiveCallback();
subscriber_.RemoveReceiveCallback();
}

void PluginWidget::updateRawMessageView()
Expand Down
10 changes: 5 additions & 5 deletions app/mon/mon_tui/src/tui/viewmodel/message_visualization/raw.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,12 +36,12 @@ class RawMessageVisualizationViewModel : public MessageVisualizationViewModel
mutable std::mutex message_mtx;
std::string latest_message;

void OnMessage(const struct eCAL::SReceiveCallbackData* callback_data)
void OnMessage(const struct eCAL::SReceiveCallbackData& callback_data)
{
{
std::lock_guard<std::mutex> lock{message_mtx};
latest_message = std::string(static_cast<char *>(callback_data->buf), callback_data->size);
message_timestamp = callback_data->time;
latest_message = std::string(static_cast<char *>(callback_data.buf), callback_data.size);
message_timestamp = callback_data.time;
}

NotifyDataUpdated();
Expand All @@ -52,7 +52,7 @@ class RawMessageVisualizationViewModel : public MessageVisualizationViewModel
: subscriber{topic}
{
using namespace std::placeholders;
subscriber.AddReceiveCallback(std::bind(&RawMessageVisualizationViewModel::OnMessage, this, _2));
subscriber.SetReceiveCallback(std::bind(&RawMessageVisualizationViewModel::OnMessage, this, _3));
}

std::string message() const
Expand Down
13 changes: 6 additions & 7 deletions app/rec/rec_client_core/src/ecal_rec_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -695,12 +695,12 @@ namespace eCAL
return subscribed_topics;
}

void EcalRecImpl::EcalMessageReceived(const char* topic_name, const eCAL::SReceiveCallbackData* callback_data)
void EcalRecImpl::EcalMessageReceived(const eCAL::Registration::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_)
{
auto ecal_receive_time = eCAL::Time::ecal_clock::now();
auto system_receive_time = std::chrono::steady_clock::now();

std::shared_ptr<Frame> frame = std::make_shared<Frame>(callback_data, topic_name, ecal_receive_time, system_receive_time);
std::shared_ptr<Frame> frame = std::make_shared<Frame>(&data_, topic_id_.topic_name, ecal_receive_time, system_receive_time);

pre_buffer_.push_back(frame);

Expand Down Expand Up @@ -821,14 +821,14 @@ namespace eCAL
if (subscriber_map_.find(topic) == subscriber_map_.end())
{
EcalRecLogger::Instance()->info("Subscribing to " + topic);
std::unique_ptr<eCAL::CSubscriber> subscriber = std::make_unique<eCAL::CSubscriber>();
if (!subscriber->Create(topic))
std::unique_ptr<eCAL::CSubscriber> subscriber = std::make_unique<eCAL::CSubscriber>(topic);
if (!subscriber)
{
EcalRecLogger::Instance()->error("Error creating subscriber for topic " + topic);
info_ = { false, "Error creating eCAL subsribers" };
continue;
}
if (!subscriber->AddReceiveCallback(std::bind(&EcalRecImpl::EcalMessageReceived, this, std::placeholders::_1, std::placeholders::_2)))
if (!subscriber->SetReceiveCallback(std::bind(&EcalRecImpl::EcalMessageReceived, this, std::placeholders::_1, std::placeholders::_3)))
{
EcalRecLogger::Instance()->error("Error adding callback for subscriber on topic " + topic);
info_ = { false, "Error creating eCAL subsribers" };
Expand All @@ -848,7 +848,6 @@ namespace eCAL
{
EcalRecLogger::Instance()->info("Unsubscribing from " + subscriber_it->first);

subscriber_it->second->Destroy();
subscriber_it = subscriber_map_.erase(subscriber_it);
}
else
Expand Down
4 changes: 2 additions & 2 deletions app/rec/rec_client_core/src/ecal_rec_impl.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,7 +115,7 @@ namespace eCAL

std::set<std::string> GetSubscribedTopics() const;

void EcalMessageReceived(const char* topic_name, const eCAL::SReceiveCallbackData* callback_data);
void EcalMessageReceived(const eCAL::Registration::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_);

//////////////////////////////////////
//// API for external threads ////
Expand Down
6 changes: 4 additions & 2 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ endif()
if(ECAL_CORE_SUBSCRIBER)
set(ecal_sub_src
src/pubsub/ecal_subscriber.cpp
src/pubsub/ecal_subscriber_impl.cpp
src/pubsub/ecal_subscriber_impl.h
src/pubsub/ecal_subscriber_v5.cpp
src/pubsub/ecal_subgate.cpp
src/pubsub/ecal_subgate.h
)
Expand Down Expand Up @@ -257,8 +260,6 @@ endif()

if(ECAL_CORE_SUBSCRIBER)
set(ecal_reader_src
src/readwrite/ecal_reader.cpp
src/readwrite/ecal_reader.h
src/readwrite/ecal_reader_layer.h
)
if(ECAL_CORE_TRANSPORT_UDP)
Expand Down Expand Up @@ -542,6 +543,7 @@ set(ecal_header_cmn
include/ecal/ecal_server_v5.h
include/ecal/ecal_service_info.h
include/ecal/ecal_subscriber.h
include/ecal/ecal_subscriber_v5.h
include/ecal/ecal_time.h
include/ecal/ecal_timer.h
include/ecal/ecal_tlayer.h
Expand Down
3 changes: 0 additions & 3 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
#include <ecal/ecal_config.h>
#include <ecal/ecal_payload_writer.h>
#include <ecal/ecal_types.h>
#include <ecal/config/publisher.h>

#include <chrono>
#include <cstddef>
#include <memory>
#include <string>

Expand Down
11 changes: 5 additions & 6 deletions ecal/core/include/ecal/ecal_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#include <memory>
#include <string>
#include <vector>

namespace eCAL
{
Expand Down Expand Up @@ -85,26 +84,26 @@ namespace eCAL
CServiceServer& operator=(CServiceServer&& rhs) noexcept;

/**
* @brief Add method callback.
* @brief Set/overwrite method callback.
*
* @param method_ Service method name.
* @param method_info_ Service method information (request & response types).
* @param callback_ Callback function for client request.
*
* @return True if successful.
* @return True if succeeded, false if not.
**/

// TODO: Provide new MethodCallbackT type using SServiceMethodInformation instead "MethodCallbackT = std::function<int(const std::string& method_, const std::string& req_type_, const std::string& resp_type_, const std::string& request_, std::string& response_)>"

ECAL_API_EXPORTED_MEMBER
bool AddMethodCallback(const std::string& method_, const SServiceMethodInformation& method_info_, const MethodCallbackT& callback_);
bool SetMethodCallback(const std::string& method_, const SServiceMethodInformation& method_info_, const MethodCallbackT& callback_);

/**
* @brief Remove method callback.
*
* @param method_ Service method name.
*
* @return True if successful.
* @return True if succeeded, false if not.
**/
ECAL_API_EXPORTED_MEMBER
bool RemoveMethodCallback(const std::string& method_);
Expand All @@ -131,7 +130,7 @@ namespace eCAL
/**
* @brief Check connection state.
*
* @return True if connected, false if not.
* @return True if succeeded, false if not.
**/
ECAL_API_EXPORTED_MEMBER
bool IsConnected();
Expand Down
Loading

0 comments on commit 7a1d931

Please sign in to comment.