Skip to content

Commit

Permalink
[zmq] add proxy mode to the ZmqServer
Browse files Browse the repository at this point in the history
* ZmqServer can now work as a proxy server (using ZmqClient to send
unprocessed messages)
* add sendRaw() method to the ZmqClient to avoid unnecessary
serialization
  • Loading branch information
Yakiv-Huryk committed Nov 9, 2024
1 parent bd0f2eb commit f4dedbb
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 33 deletions.
53 changes: 30 additions & 23 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <system_error>
#include <zmq.h>
#include "zmqclient.h"
#include "zmqserver.h"
#include "binaryserializer.h"

using namespace std;
Expand Down Expand Up @@ -113,27 +114,10 @@ void ZmqClient::connect()
m_connected = true;
}

void ZmqClient::sendMsg(
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
sendbuffer.data(),
sendbuffer.size(),
dbName,
tableName,
kcos);

if (serializedlen >= MQ_RESPONSE_MAX_COUNT)
{
SWSS_LOG_THROW("ZmqClient sendMsg message was too big (buffer size %d bytes, got %d), reduce the message size, message DROPPED",
MQ_RESPONSE_MAX_COUNT,
serializedlen);
}

SWSS_LOG_DEBUG("sending: %d", serializedlen);
void ZmqClient::sendRaw(const char* buffer, size_t size)
{
SWSS_LOG_DEBUG("sending: %zu", size);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
Expand All @@ -144,12 +128,12 @@ void ZmqClient::sendMsg(
std::lock_guard<std::mutex> lock(m_socketMutex);

// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
rc = zmq_send(m_socket, buffer, size, ZMQ_NOBLOCK);
}

if (rc >= 0)
{
SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen);
SWSS_LOG_DEBUG("zmq sended %zu bytes", size);
return;
}

Expand Down Expand Up @@ -192,9 +176,32 @@ void ZmqClient::sendMsg(
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(size);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

void ZmqClient::sendMsg(
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
sendbuffer.data(),
sendbuffer.size(),
dbName,
tableName,
kcos);

if (serializedlen >= MQ_RESPONSE_MAX_COUNT)
{
SWSS_LOG_THROW("ZmqClient sendMsg message was too big (buffer size %d bytes, got %d), reduce the message size, message DROPPED",
MQ_RESPONSE_MAX_COUNT,
serializedlen);
}

sendRaw(sendbuffer.data(), serializedlen);
}

}
4 changes: 3 additions & 1 deletion common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <queue>
#include <thread>
#include <mutex>
#include "zmqserver.h"
#include "table.h"

namespace swss {

Expand All @@ -24,6 +24,8 @@ class ZmqClient
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);

void sendRaw(const char* buffer, size_t size);
private:
void initialize(const std::string& endpoint, const std::string& vrf);

Expand Down
5 changes: 5 additions & 0 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string
SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str());
}

ZmqConsumerStateTable::~ZmqConsumerStateTable()
{
m_zmqServer.unregisterMessageHandler(m_db->getDbName(), getTableName());
}

void ZmqConsumerStateTable::handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos)
{
for (auto kco : kcos)
Expand Down
1 change: 1 addition & 0 deletions common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = false);
~ZmqConsumerStateTable();

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
Expand Down
49 changes: 43 additions & 6 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ ZmqServer::ZmqServer(const std::string& endpoint)

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
m_vrf(vrf),
m_proxy_mode(false)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
Expand All @@ -33,11 +34,23 @@ ZmqServer::~ZmqServer()
m_mqPollThread->join();
}

void ZmqServer::enableProxyMode(const std::string& proxy_endpoint)
{
m_proxy_client = make_unique<ZmqClient>(proxy_endpoint);
m_proxy_mode = true;
}

bool ZmqServer::isProxyMode() const {
return m_proxy_mode;
}

void ZmqServer::registerMessageHandler(
const std::string dbName,
const std::string tableName,
ZmqMessageHandler* handler)
{
std::lock_guard<std::mutex> lock(m_handlerMapMutext);

auto dbResult = m_HandlerMap.insert(pair<string, map<string, ZmqMessageHandler*>>(dbName, map<string, ZmqMessageHandler*>()));
if (dbResult.second) {
SWSS_LOG_DEBUG("ZmqServer add handler mapping for db: %s", dbName.c_str());
Expand All @@ -49,10 +62,31 @@ void ZmqServer::registerMessageHandler(
}
}

void ZmqServer::unregisterMessageHandler(const std::string &dbName, const std::string &tableName)
{
std::lock_guard<std::mutex> lock(m_handlerMapMutext);

SWSS_LOG_DEBUG("ZmqServer unregister handler for db: %s, table: %s", dbName.c_str(), tableName.c_str());

auto db = m_HandlerMap.find(dbName);
if (db == m_HandlerMap.end()) {
SWSS_LOG_ERROR("ZmqServer can't unregister a handler for db: %s - not found", dbName.c_str());
return;
}

auto removed = db->second.erase(tableName);
if (!removed) {
SWSS_LOG_ERROR("ZmqServer can't unregister a handler for db: %s table %s - not found", dbName.c_str(), tableName.c_str());
return;
}
}

ZmqMessageHandler* ZmqServer::findMessageHandler(
const std::string dbName,
const std::string tableName)
{
std::lock_guard<std::mutex> lock(m_handlerMapMutext);

auto dbMappingIter = m_HandlerMap.find(dbName);
if (dbMappingIter == m_HandlerMap.end()) {
SWSS_LOG_DEBUG("ZmqServer can't find any handler for db: %s", dbName.c_str());
Expand All @@ -77,12 +111,15 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size)

// find handler
auto handler = findMessageHandler(dbName, tableName);
if (handler == nullptr) {
SWSS_LOG_WARN("ZmqServer can't find handler for received message: %s", buffer);
return;
if (handler) {
handler->handleReceivedData(kcos);
} else {
if (isProxyMode()) {
m_proxy_client->sendRaw(buffer, size);
} else {
SWSS_LOG_WARN("ZmqServer can't find handler for received message: %.*s", (int)size, buffer);
}
}

handler->handleReceivedData(kcos);
}

void ZmqServer::mqPollThread()
Expand Down
16 changes: 16 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
#include <deque>
#include <condition_variable>
#include <vector>
#include <memory>
#include <atomic>
#include "table.h"
#include "zmqclient.h"

#define MQ_RESPONSE_MAX_COUNT (16*1024*1024)
#define MQ_SIZE 100
Expand Down Expand Up @@ -34,15 +37,22 @@ class ZmqServer
ZmqServer(const std::string& endpoint, const std::string& vrf);
~ZmqServer();

void enableProxyMode(const std::string& proxy_endpoint);

void registerMessageHandler(
const std::string dbName,
const std::string tableName,
ZmqMessageHandler* handler);

void unregisterMessageHandler(const std::string &dbName,
const std::string &tableName);

private:
void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();

bool isProxyMode() const;

ZmqMessageHandler* findMessageHandler(const std::string dbName, const std::string tableName);

Expand All @@ -56,6 +66,12 @@ class ZmqServer

std::string m_vrf;

std::atomic<bool> m_proxy_mode;

std::unique_ptr<ZmqClient> m_proxy_client;

std::mutex m_handlerMapMutext;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
1 change: 1 addition & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tests_tests_SOURCES = tests/redis_ut.cpp \
tests/profileprovider_ut.cpp \
tests/c_api_ut.cpp \
tests/performancetimer_ut.cpp \
tests/zmq_proxy_ut.cpp \
tests/main.cpp

tests_tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(LIBNL_CFLAGS)
Expand Down
6 changes: 3 additions & 3 deletions tests/c_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
EXPECT_EQ(kfvFieldsValues(kfvs[1]).size(), 0);
}

// Server must be freed first to safely release message handlers (ZmqConsumerStateTable)
SWSSZmqServer_free(srv);

// The message handlers (ZmqConsumerStateTable) must be freed first to safely unregister from the Server
SWSSZmqProducerStateTable_free(pst);
SWSSZmqConsumerStateTable_free(cst);

SWSSZmqServer_free(srv);

SWSSZmqClient_free(cli);

SWSSDBConnector_flushdb(db);
Expand Down
Loading

0 comments on commit f4dedbb

Please sign in to comment.