Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[zmq] add proxy mode to the ZmqServer #948

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <system_error>
#include <zmq.h>
#include "zmqclient.h"
#include "zmqserver.h"
#include "binaryserializer.h"

using namespace std;
Expand Down Expand Up @@ -114,26 +115,9 @@ void ZmqClient::connect()
m_connected = true;
}

void ZmqClient::sendMsg(
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos)
void ZmqClient::sendRaw(const char* buffer, size_t size)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_sendbuffer.data(),
m_sendbuffer.size(),
dbName,
tableName,
kcos);

if (serializedlen >= MQ_RESPONSE_MAX_COUNT)
{
SWSS_LOG_THROW("ZmqClient sendMsg message was too big (buffer size %d bytes, got %d), reduce the message size, message DROPPED",
MQ_RESPONSE_MAX_COUNT,
serializedlen);
}

SWSS_LOG_DEBUG("sending: %d", serializedlen);
SWSS_LOG_DEBUG("sending: %zu", size);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
Expand All @@ -144,12 +128,12 @@ void ZmqClient::sendMsg(
std::lock_guard<std::mutex> lock(m_socketMutex);

// Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send
rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK);
rc = zmq_send(m_socket, buffer, size, ZMQ_NOBLOCK);
}

if (rc >= 0)
{
SWSS_LOG_DEBUG("zmq sended %d bytes", serializedlen);
SWSS_LOG_DEBUG("zmq sended %zu bytes", size);
return;
}

Expand Down Expand Up @@ -192,9 +176,31 @@ void ZmqClient::sendMsg(
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(size);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::io_error), message);
}

void ZmqClient::sendMsg(
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_sendbuffer.data(),
m_sendbuffer.size(),
dbName,
tableName,
kcos);

if (serializedlen >= MQ_RESPONSE_MAX_COUNT)
{
SWSS_LOG_THROW("ZmqClient sendMsg message was too big (buffer size %d bytes, got %d), reduce the message size, message DROPPED",
MQ_RESPONSE_MAX_COUNT,
serializedlen);
}

sendRaw(m_sendbuffer.data(), serializedlen);
}

}
4 changes: 3 additions & 1 deletion common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <queue>
#include <thread>
#include <mutex>
#include "zmqserver.h"
#include "table.h"

namespace swss {

Expand All @@ -23,6 +23,8 @@ class ZmqClient
void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos);

void sendRaw(const char* buffer, size_t size);
private:
void initialize(const std::string& endpoint, const std::string& vrf);

Expand Down
5 changes: 5 additions & 0 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string
SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str());
}

ZmqConsumerStateTable::~ZmqConsumerStateTable()
{
m_zmqServer.unregisterMessageHandler(m_db->getDbName(), getTableName());
}

void ZmqConsumerStateTable::handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos)
{
for (auto kco : kcos)
Expand Down
1 change: 1 addition & 0 deletions common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ZmqConsumerStateTable(DBConnector *db, const std::string &tableName, ZmqServer &zmqServer, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0, bool dbPersistence = false);
~ZmqConsumerStateTable();

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
Expand Down
49 changes: 43 additions & 6 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ ZmqServer::ZmqServer(const std::string& endpoint)

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
m_vrf(vrf),
m_proxy_mode(false)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
Expand All @@ -33,11 +34,23 @@ ZmqServer::~ZmqServer()
m_mqPollThread->join();
}

void ZmqServer::enableProxyMode(const std::string& proxy_endpoint)
{
m_proxy_client = make_unique<ZmqClient>(proxy_endpoint);
m_proxy_mode = true;
}

bool ZmqServer::isProxyMode() const {
return m_proxy_mode;
}

void ZmqServer::registerMessageHandler(
const std::string dbName,
const std::string tableName,
ZmqMessageHandler* handler)
{
std::lock_guard<std::mutex> lock(m_handlerMapMutext);

auto dbResult = m_HandlerMap.insert(pair<string, map<string, ZmqMessageHandler*>>(dbName, map<string, ZmqMessageHandler*>()));
if (dbResult.second) {
SWSS_LOG_DEBUG("ZmqServer add handler mapping for db: %s", dbName.c_str());
Expand All @@ -49,10 +62,31 @@ void ZmqServer::registerMessageHandler(
}
}

void ZmqServer::unregisterMessageHandler(const std::string &dbName, const std::string &tableName)
{
std::lock_guard<std::mutex> lock(m_handlerMapMutext);

SWSS_LOG_DEBUG("ZmqServer unregister handler for db: %s, table: %s", dbName.c_str(), tableName.c_str());

auto db = m_HandlerMap.find(dbName);
if (db == m_HandlerMap.end()) {
SWSS_LOG_ERROR("ZmqServer can't unregister a handler for db: %s - not found", dbName.c_str());
return;
}

auto removed = db->second.erase(tableName);
if (!removed) {
SWSS_LOG_ERROR("ZmqServer can't unregister a handler for db: %s table %s - not found", dbName.c_str(), tableName.c_str());
return;
}
}

ZmqMessageHandler* ZmqServer::findMessageHandler(
const std::string dbName,
const std::string tableName)
{
std::lock_guard<std::mutex> lock(m_handlerMapMutext);

auto dbMappingIter = m_HandlerMap.find(dbName);
if (dbMappingIter == m_HandlerMap.end()) {
SWSS_LOG_DEBUG("ZmqServer can't find any handler for db: %s", dbName.c_str());
Expand All @@ -77,12 +111,15 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size)

// find handler
auto handler = findMessageHandler(dbName, tableName);
if (handler == nullptr) {
SWSS_LOG_WARN("ZmqServer can't find handler for received message: %s", buffer);
return;
if (handler) {
handler->handleReceivedData(kcos);
} else {
if (isProxyMode()) {
m_proxy_client->sendRaw(buffer, size);
} else {
SWSS_LOG_WARN("ZmqServer can't find handler for received message: %.*s", (int)size, buffer);
}
}

handler->handleReceivedData(kcos);
}

void ZmqServer::mqPollThread()
Expand Down
16 changes: 16 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
#include <deque>
#include <condition_variable>
#include <vector>
#include <memory>
#include <atomic>
#include "table.h"
#include "zmqclient.h"

#define MQ_RESPONSE_MAX_COUNT (16*1024*1024)
#define MQ_SIZE 100
Expand Down Expand Up @@ -34,15 +37,22 @@ class ZmqServer
ZmqServer(const std::string& endpoint, const std::string& vrf);
~ZmqServer();

void enableProxyMode(const std::string& proxy_endpoint);

void registerMessageHandler(
const std::string dbName,
const std::string tableName,
ZmqMessageHandler* handler);

void unregisterMessageHandler(const std::string &dbName,
const std::string &tableName);

private:
void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();

bool isProxyMode() const;

ZmqMessageHandler* findMessageHandler(const std::string dbName, const std::string tableName);

Expand All @@ -56,6 +66,12 @@ class ZmqServer

std::string m_vrf;

std::atomic<bool> m_proxy_mode;

std::unique_ptr<ZmqClient> m_proxy_client;

std::mutex m_handlerMapMutext;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
1 change: 1 addition & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tests_tests_SOURCES = tests/redis_ut.cpp \
tests/profileprovider_ut.cpp \
tests/c_api_ut.cpp \
tests/performancetimer_ut.cpp \
tests/zmq_proxy_ut.cpp \
tests/main.cpp

tests_tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(LIBNL_CFLAGS)
Expand Down
6 changes: 3 additions & 3 deletions tests/c_api_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ TEST(c_api, ZmqConsumerProducerStateTable) {
EXPECT_EQ(kfvFieldsValues(kfvs[1]).size(), 0);
}

// Server must be freed first to safely release message handlers (ZmqConsumerStateTable)
SWSSZmqServer_free(srv);

// The message handlers (ZmqConsumerStateTable) must be freed first to safely unregister from the Server
SWSSZmqProducerStateTable_free(pst);
SWSSZmqConsumerStateTable_free(cst);

SWSSZmqServer_free(srv);

SWSSZmqClient_free(cli);

SWSSDBConnector_flushdb(db);
Expand Down
Loading