Skip to content

Commit

Permalink
PubSubStruct: implement
Browse files Browse the repository at this point in the history
Unifies data structures for data publication and subscription on local and
remote layers. This will be probably sufficient for a long time.

Removes `RemoteMsg` as it's replaced by `SubData`.
  • Loading branch information
DavidB137 committed Sep 5, 2024
1 parent ecac23e commit 3be307c
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 162 deletions.
8 changes: 4 additions & 4 deletions include/kvik/layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

#include "kvik/errors.hpp"
#include "kvik/local_msg.hpp"
#include "kvik/remote_msg.hpp"
#include "kvik/pub_sub_struct.hpp"

namespace kvik
{
Expand Down Expand Up @@ -72,7 +72,7 @@ namespace kvik
class IRemoteLayer
{
public:
using RecvCb = std::function<ErrCode(const RemoteMsg &)>;
using RecvCb = std::function<ErrCode(const SubData &)>;
using ReconnectCb = std::function<ErrCode()>;

protected:
Expand All @@ -85,10 +85,10 @@ namespace kvik
*
* Should be used by `INode` only!
*
* @param msg Message to publish
* @param data Message to publish
* @return Error code
*/
virtual ErrCode publish(const RemoteMsg &msg) = 0;
virtual ErrCode publish(const PubData &data) = 0;

/**
* @brief Subscribes to given topic
Expand Down
15 changes: 8 additions & 7 deletions include/kvik/local_broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "kvik/errors.hpp"
#include "kvik/layers.hpp"
#include "kvik/pub_sub_struct.hpp"
#include "kvik/wildcard_trie.hpp"

namespace kvik
Expand All @@ -26,8 +27,8 @@ namespace kvik
class LocalBroker : public IRemoteLayer
{
std::mutex m_mutex;
kvik::WildcardTrie<bool> m_subs; //!< Subscriptions
std::string m_topicPrefix; //!< Topic prefix for publishing
kvik::WildcardTrie<bool> m_subs; //!< Subscriptions
std::string m_topicPrefix; //!< Topic prefix for publishing

public:
/**
Expand All @@ -41,18 +42,18 @@ namespace kvik
~LocalBroker();

/**
* @brief Publishes message coming from node
* @brief Publishes data coming from node
*
* Should be used by `INode` only!
*
* If subscription for message's topic exists, immediately calls
* receive callback (from current thread).
* If subscription for topic exists, immediately calls receive
* callback (from current thread).
*
* @param msg Message to publish
* @param data Data to publish
* @return ErrCode::SUCCESS If no receive callback called.
* @return Any error code returned by receive callback.
*/
ErrCode publish(const RemoteMsg &msg);
ErrCode publish(const PubData &data);

/**
* @brief Subscribes to given topic
Expand Down
29 changes: 20 additions & 9 deletions include/kvik/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <functional>

#include "kvik/errors.hpp"
#include "kvik/pub_sub_struct.hpp"

namespace kvik
{
Expand All @@ -38,12 +39,6 @@ namespace kvik
TopicSeparators topicSep;
};

/**
* @brief Subscribe callback type
*/
using SubscribeCb = std::function<void(const std::string &topic,
const std::string &payload)>;

/**
* @brief Interface for generic node type of Kvik
*/
Expand All @@ -64,8 +59,24 @@ namespace kvik
* @param payload Payload
* @return Error code
*/
virtual ErrCode publish(const std::string &topic,
const std::string &payload) = 0;
inline ErrCode publish(const std::string &topic,
const std::string &payload)
{
return INode::publish({
.topic = topic,
.payload = payload,
});
}

/**
* @brief Publishes payload to topic
*
* More generic version of `publish(topic, payload)`.
*
* @param data Data to publish
* @return Error code
*/
virtual ErrCode publish(const PubData &data) = 0;

/**
* @brief Subscribes to topic
Expand All @@ -76,7 +87,7 @@ namespace kvik
* @param cb Callback function
* @return Error code
*/
virtual ErrCode subscribe(const std::string &topic, SubscribeCb cb) = 0;
virtual ErrCode subscribe(const std::string &topic, SubCb cb) = 0;

/**
* @brief Unsubscribes from topic
Expand Down
99 changes: 99 additions & 0 deletions include/kvik/pub_sub_struct.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* @file pub_sub_struct.hpp
* @author Dávid Benko ([email protected])
* @brief Publication/subscription structs and callbacks
*
* @copyright Copyright (c) 2024
*
*/

#pragma once

#include <functional>
#include <string>

namespace kvik
{
/**
* @brief Subscription data structure
*
* Contains topic, payload and in the future maybe more details of
* received data for subscription.
*/
struct SubData
{
std::string topic = ""; //!< Topic of message
std::string payload = ""; //!< Payload of message

bool operator==(const SubData &other) const;
bool operator!=(const SubData &other) const;

/**
* @brief Converts `SubData` to printable string
*
* Primarily for logging purposes
*
* @return String representation of contained data
*/
std::string toString() const;
};

/**
* @brief Publication data structure
*
* Contains topic, payload and in the future maybe more settings.
*/
struct PubData
{
std::string topic = ""; //!< Topic of message
std::string payload = ""; //!< Payload of message

bool operator==(const PubData &other) const;
bool operator!=(const PubData &other) const;

/**
* @brief Converts `PubData` to printable string
*
* Primarily for logging purposes
*
* @return String representation of contained data
*/
std::string toString() const;

/**
* @brief Converts `PubData` to `SubData`
*
* Useful when publication is immediatelly sent back as subscription.
*
* @return Subscription data
*/
SubData toSubData() const;
};

/**
* @brief Subscribe callback type
*/
using SubCb = std::function<void(const SubData &data)>;
} // namespace kvik

// Define hasher function
template <>
struct std::hash<kvik::PubData>
{
std::size_t operator()(kvik::PubData const &data) const noexcept
{
return std::hash<std::string>{}(data.topic) +
std::hash<std::string>{}(data.payload);
}
};

// Define hasher function
template <>
struct std::hash<kvik::SubData>
{
std::size_t operator()(kvik::SubData const &data) const noexcept
{
return std::hash<std::string>{}(data.topic) +
std::hash<std::string>{}(data.payload);
}
};
49 changes: 0 additions & 49 deletions include/kvik/remote_msg.hpp

This file was deleted.

10 changes: 5 additions & 5 deletions src/common/local_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,26 @@ namespace kvik
KVIK_LOGD("Deinitialized");
}

ErrCode LocalBroker::publish(const RemoteMsg &msg)
ErrCode LocalBroker::publish(const PubData &data)
{
KVIK_LOGD("Publishing %zu bytes to topic '%s'",
msg.payload.length(), msg.topic.c_str());
data.payload.length(), data.topic.c_str());

// Check if node is subscribed to this topic
bool subscribed;
{
const std::scoped_lock lock(m_mutex);
subscribed = !m_subs.find(msg.topic).empty();
subscribed = !m_subs.find(data.topic).empty();
}

if (subscribed && m_recvCb != nullptr)
{
KVIK_LOGD("Subscription exists for published data, calling "
"callback on topic '%s'",
msg.topic.c_str());
data.topic.c_str());

// Send data back as received
KVIK_RETURN_ERROR(m_recvCb(msg));
KVIK_RETURN_ERROR(m_recvCb(data.toSubData()));
}

return ErrCode::SUCCESS;
Expand Down
57 changes: 57 additions & 0 deletions src/common/pub_sub_struct.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* @file pub_sub_struct.cpp
* @author Dávid Benko ([email protected])
* @brief Publication/subscription structs and callbacks
*
* @copyright Copyright (c) 2024
*
*/

#include <string>

#include "kvik/pub_sub_struct.hpp"

namespace kvik
{
bool PubData::operator==(const PubData &other) const
{
return topic == other.topic &&
payload == other.payload;
}

bool PubData::operator!=(const PubData &other) const
{
return !this->operator==(other);
}

std::string PubData::toString() const
{
return (!topic.empty() ? topic : "(no topic)") + " " +
"(" + std::to_string(payload.length()) + " B payload)";
}

SubData PubData::toSubData() const
{
return {
.topic = topic,
.payload = payload,
};
}

bool SubData::operator==(const SubData &other) const
{
return topic == other.topic &&
payload == other.payload;
}

bool SubData::operator!=(const SubData &other) const
{
return !this->operator==(other);
}

std::string SubData::toString() const
{
return (!topic.empty() ? topic : "(no topic)") + " " +
"(" + std::to_string(payload.length()) + " B payload)";
}
} // namespace kvik
Loading

0 comments on commit 3be307c

Please sign in to comment.