Skip to content

Commit

Permalink
Rebased and pre-commit checked (#951)
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastianoTaddei authored Jan 28, 2025
1 parent a6f2e6a commit 45b678f
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 40 deletions.
138 changes: 116 additions & 22 deletions plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#include "datastream_zmq.h"
#include "ui_datastream_zmq.h"

#include <QMessageBox>
#include "PlotJuggler/messageparser_base.h"
#include <QDebug>
#include <QSettings>
#include <QDialog>
#include <QIntValidator>
#include <QMessageBox>
#include <QSettings>
#include <chrono>
#include "PlotJuggler/messageparser_base.h"
#include <iostream>

using namespace PJ;

Expand Down Expand Up @@ -73,7 +74,7 @@ bool DataStreamZMQ::start(QStringList*)
QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString();
QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString();
QString topics = settings.value("ZMQ_Subscriber::topics", "").toString();
bool is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool();
_is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool();

QString previous_address = address;

Expand All @@ -86,11 +87,11 @@ bool DataStreamZMQ::start(QStringList*)
else
{
previous_address = dialog->ui->lineEditAddress->text();
dialog->ui->lineEditAddress->setText("*");
dialog->ui->lineEditAddress->setText("0.0.0.0");
}
});

if (is_connect)
if (_is_connect)
{
dialog->ui->radioConnect->setChecked(true);
}
Expand All @@ -105,21 +106,19 @@ bool DataStreamZMQ::start(QStringList*)
dialog->ui->lineEditPort->setText(QString::number(port));
dialog->ui->lineEditTopics->setText(topics);

ParserFactoryPlugin::Ptr parser_creator;

connect(dialog->ui->comboBoxProtocol,
qOverload<const QString&>(&QComboBox::currentIndexChanged), this,
[&](const QString& selected_protocol) {
if (parser_creator)
if (_parser_creator)
{
if (auto prev_widget = parser_creator->optionsWidget())
if (auto prev_widget = _parser_creator->optionsWidget())
{
prev_widget->setVisible(false);
}
}
parser_creator = parserFactories()->at(selected_protocol);
_parser_creator = parserFactories()->at(selected_protocol);

if (auto widget = parser_creator->optionsWidget())
if (auto widget = _parser_creator->optionsWidget())
{
widget->setVisible(true);
}
Expand All @@ -138,21 +137,21 @@ bool DataStreamZMQ::start(QStringList*)
port = dialog->ui->lineEditPort->text().toUShort(&ok);
protocol = dialog->ui->comboBoxProtocol->currentText();
topics = dialog->ui->lineEditTopics->text();
is_connect = dialog->ui->radioConnect->isChecked();
_is_connect = dialog->ui->radioConnect->isChecked();

_parser = parser_creator->createParser({}, {}, {}, dataMap());
_parser = _parser_creator->createParser({}, {}, {}, dataMap());

// save back to service
settings.setValue("ZMQ_Subscriber::address", address);
settings.setValue("ZMQ_Subscriber::protocol", protocol);
settings.setValue("ZMQ_Subscriber::port", port);
settings.setValue("ZMQ_Subscriber::topics", topics);
settings.setValue("ZMQ_Subscriber::is_connect", is_connect);
settings.setValue("ZMQ_Subscriber::is_connect", _is_connect);

QString addr =
dialog->ui->comboBox->currentText() + address + ":" + QString::number(port);
_socket_address = addr.toStdString();
if (is_connect)
if (_is_connect)
{
_zmq_socket.connect(_socket_address.c_str());
}
Expand All @@ -164,6 +163,12 @@ bool DataStreamZMQ::start(QStringList*)
parseTopicFilters(topics);
subscribeTopics();

// Add a parser for each topic
for (const auto& topic : _topic_filters)
{
_parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap());
}

_zmq_socket.set(zmq::sockopt::rcvtimeo, 100);

qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address);
Expand All @@ -188,7 +193,14 @@ void DataStreamZMQ::shutdown()

unsubscribeTopics();

_zmq_socket.disconnect(_socket_address.c_str());
if (_is_connect)
{
_zmq_socket.disconnect(_socket_address.c_str());
}
else
{
_zmq_socket.unbind(_socket_address.c_str());
}
}
}

Expand All @@ -199,18 +211,79 @@ void DataStreamZMQ::receiveLoop()
zmq::message_t recv_msg;
zmq::recv_result_t result = _zmq_socket.recv(recv_msg);

if (recv_msg.size() > 0)
// If we did not receive anything, continue
if (recv_msg.size() <= 0)
{
continue;
}

// If there are more parts, then it is the topic
std::string topic = "";
if (recv_msg.more())
{
using namespace std::chrono;
auto ts = high_resolution_clock::now().time_since_epoch();
double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).count());
topic =
std::string(reinterpret_cast<const char*>(recv_msg.data()), recv_msg.size());

PJ::MessageRef msg(reinterpret_cast<uint8_t*>(recv_msg.data()), recv_msg.size());
// Then it is the payload
recv_msg.rebuild();
result = _zmq_socket.recv(recv_msg);

// If we did not receive anything, continue
if (recv_msg.size() <= 0)
{
continue;
}
}

PJ::MessageRef msg{ PJ::MessageRef(reinterpret_cast<uint8_t*>(recv_msg.data()),
recv_msg.size()) };

// If there are more parts, then it is the timestamp
double timestamp = 0.0;
if (recv_msg.more())
{
recv_msg.rebuild();
result = _zmq_socket.recv(recv_msg);

if (recv_msg.size() > 0)
{
// The timestamp is the seconds since the epoch as a string
timestamp = std::stod(
std::string(reinterpret_cast<const char*>(recv_msg.data()), recv_msg.size()));
}
}
else
{
// If there are no more parts, the timestamp is the current time
timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count() *
1e-6;
}

// Parse the message without a topic if it is empty
if (topic.empty())
{
if (parseMessage(msg, timestamp))
{
emit this->dataReceived();
}
}
// Otherwise, parse the message with the topic
else
{
if (parseMessage(topic, msg, timestamp))
{
emit this->dataReceived();
}
}

// Extinguish remaining parts (if any)
while (recv_msg.more())
{
recv_msg.rebuild();
result = _zmq_socket.recv(recv_msg);
}
}
}

Expand All @@ -228,6 +301,27 @@ bool DataStreamZMQ::parseMessage(const PJ::MessageRef& msg, double& timestamp)
}
}

bool DataStreamZMQ::parseMessage(const std::string& topic, const PJ::MessageRef& msg,
double& timestamp)
{
try
{
std::lock_guard<std::mutex> lock(mutex());
// If the topic is not in the map keys, create a new parser
if (_parsers.find(topic) == _parsers.end())
{
_parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap());
}

_parsers[topic]->parseMessage(msg, timestamp);
return true;
}
catch (...)
{
return false;
}
}

void DataStreamZMQ::parseTopicFilters(const QString& topic_filters)
{
const QRegExp regex("(,{0,1}\\s+)|(;\\s*)");
Expand Down
12 changes: 9 additions & 3 deletions plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once
#include <QDialog>

#include <QtPlugin>
#include <thread>
#include "PlotJuggler/datastreamer_base.h"
#include "PlotJuggler/messageparser_base.h"
#include "ui_datastream_zmq.h"
#include "zmq.hpp"
#include <QtPlugin>
#include <map>
#include <string>
#include <thread>

class StreamZMQDialog : public QDialog
{
Expand Down Expand Up @@ -57,9 +59,13 @@ class DataStreamZMQ : public PJ::DataStreamer
std::string _socket_address;
std::thread _receive_thread;
std::vector<std::string> _topic_filters;

std::map<std::string, PJ::MessageParserPtr> _parsers;
PJ::ParserFactoryPlugin::Ptr _parser_creator;
bool _is_connect = false;
void receiveLoop();
bool parseMessage(const PJ::MessageRef& msg, double& timestamp);
bool parseMessage(const std::string& topic, const PJ::MessageRef& msg,
double& timestamp);
void parseTopicFilters(const QString& filters);
void subscribeTopics();
void unsubscribeTopics();
Expand Down
49 changes: 34 additions & 15 deletions plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@
import json
import argparse

from time import sleep
from time import sleep, time_ns
import numpy as np

PORT = 9872

parser = argparse.ArgumentParser("start_test_publisher")

parser.add_argument("--topic|-t",
dest="topic",
help="Topic on which messages will be published",
type=str,
required=False)
parser.add_argument(
"--topic|-t",
dest="topic",
help="Topic on which messages will be published",
type=str,
required=False,
)
parser.add_argument(
"--timestamp",
dest="timestamp",
help="Send timestamp as message part, requires topic to work properly",
required=False,
action="store_true",
)

args = parser.parse_args()
topic = args.topic
timestamp = args.timestamp


def main():
Expand All @@ -29,28 +39,37 @@ def main():
ticks = 0

while True:
out_str = []
packet = []
data = {
"ticks": ticks,
"data": {
"cos": math.cos(ticks),
"sin": math.sin(ticks),
"floor": np.floor(np.cos(ticks)),
"ceil": np.ceil(np.cos(ticks))
}
"ceil": np.ceil(np.cos(ticks)),
},
}

if topic:
print(f"[{topic}] - " + json.dumps(data))
server_socket.send_multipart(
[topic.encode(), json.dumps(data).encode()])
else:
print(json.dumps(data))
server_socket.send(json.dumps(data).encode())
out_str.append(f"[{topic}] - ")
packet.append(topic.encode())

out_str.append(json.dumps(data))
packet.append(out_str[-1].encode())

if timestamp:
timestamp_s = str(time_ns() * 1e-9)
out_str.append(" - timestamp: " + timestamp_s)
packet.append(timestamp_s.encode())

print("".join(out_str))
server_socket.send_multipart(packet)

ticks += 1

sleep(0.1)


if __name__ == '__main__':
if __name__ == "__main__":
main()

0 comments on commit 45b678f

Please sign in to comment.