From 06a974e968036d4288b4931722341c7de1268cb0 Mon Sep 17 00:00:00 2001 From: Oleksandr Date: Fri, 17 Nov 2023 18:19:46 +0200 Subject: [PATCH] implement TelegramBuffer which acts like a stream, with ability to extract well formed telegrams --- src/InteractivePathAnalysis/client/client.cpp | 3 +- src/InteractivePathAnalysis/client/client.h | 2 +- src/InteractivePathAnalysis/client/keys.h | 2 +- .../client/tcpsocket.cpp | 12 ++-- .../client/tcpsocket.h | 4 +- .../client/telegrambuffer.cpp | 25 ++++++++ .../client/telegrambuffer.h | 57 +++++++++++++++++++ src/Main/CMakeLists.txt | 2 + 8 files changed, 97 insertions(+), 10 deletions(-) create mode 100644 src/InteractivePathAnalysis/client/telegrambuffer.cpp create mode 100644 src/InteractivePathAnalysis/client/telegrambuffer.h diff --git a/src/InteractivePathAnalysis/client/client.cpp b/src/InteractivePathAnalysis/client/client.cpp index f0a5d3261..edc670afe 100644 --- a/src/InteractivePathAnalysis/client/client.cpp +++ b/src/InteractivePathAnalysis/client/client.cpp @@ -78,11 +78,12 @@ void Client::handleResponse(const QByteArray& bytes) } } -void Client::sendRequest(const QByteArray& requestBytes, const QString& initiator) +void Client::sendRequest(QByteArray& requestBytes, const QString& initiator) { if (!m_socket.isConnected()) { m_socket.connect(); } + requestBytes.append(static_cast(TELEGRAM_FRAME_DELIMETER)); qDebug() << "sending:" << requestBytes << QString("requested by [%1]").arg(initiator); if (!m_socket.write(requestBytes)) { qCritical() << "unable to send" << requestBytes; diff --git a/src/InteractivePathAnalysis/client/client.h b/src/InteractivePathAnalysis/client/client.h index 6a0207198..3277d9286 100644 --- a/src/InteractivePathAnalysis/client/client.h +++ b/src/InteractivePathAnalysis/client/client.h @@ -40,7 +40,7 @@ public slots: QTimer m_timer; #endif // ENABLE_AUTOMATIC_REQUEST - void sendRequest(const QByteArray&, const QString&); + void sendRequest(QByteArray&, const QString&); void handleResponse(const QByteArray&); }; diff --git a/src/InteractivePathAnalysis/client/keys.h b/src/InteractivePathAnalysis/client/keys.h index aa235331b..241172c78 100644 --- a/src/InteractivePathAnalysis/client/keys.h +++ b/src/InteractivePathAnalysis/client/keys.h @@ -5,7 +5,7 @@ constexpr const char* KEY_CMD = "CMD"; constexpr const char* KEY_OPTIONS = "OPTIONS"; constexpr const char* KEY_DATA = "DATA"; constexpr const char* KEY_STATUS = "STATUS"; -constexpr const char* END_TELEGRAM_SEQUENCE = "###___END_FRAME___###"; +constexpr const unsigned char TELEGRAM_FRAME_DELIMETER{0x17}; // 0x17 - End of Transmission Block enum CMD { CMD_GET_PATH_LIST_ID=0, diff --git a/src/InteractivePathAnalysis/client/tcpsocket.cpp b/src/InteractivePathAnalysis/client/tcpsocket.cpp index 17f43861f..37c601f07 100644 --- a/src/InteractivePathAnalysis/client/tcpsocket.cpp +++ b/src/InteractivePathAnalysis/client/tcpsocket.cpp @@ -74,18 +74,18 @@ void TcpSocket::handleStateChanged(QAbstractSocket::SocketState state) void TcpSocket::handleDataReady() { QByteArray bytes = m_socket.readAll(); - m_bytesBuf.append(bytes); + m_telegramBuff.append(bytes.constData()); - if (bytes.endsWith(END_TELEGRAM_SEQUENCE)) { - m_bytesBuf.chop(strlen(END_TELEGRAM_SEQUENCE)); // remove end telegram sequence - emit dataRecieved(m_bytesBuf); - m_bytesBuf.clear(); + auto frames = m_telegramBuff.takeFrames(); + for (const ByteArray& frame: frames) { + QByteArray bytes(reinterpret_cast(frame.data().data()), frame.data().size()); + emit dataRecieved(bytes); } } void TcpSocket::handleError(QAbstractSocket::SocketError) { - m_bytesBuf.clear(); + m_telegramBuff.clear(); qDebug() << m_socket.errorString(); } diff --git a/src/InteractivePathAnalysis/client/tcpsocket.h b/src/InteractivePathAnalysis/client/tcpsocket.h index eb407c3cb..df62a249d 100644 --- a/src/InteractivePathAnalysis/client/tcpsocket.h +++ b/src/InteractivePathAnalysis/client/tcpsocket.h @@ -1,5 +1,7 @@ #pragma once +#include "telegrambuffer.h" + #include #include #include @@ -32,7 +34,7 @@ class TcpSocket : public QObject { private: QTcpSocket m_socket; QTimer m_connectionWatcher; - QByteArray m_bytesBuf; // to aggregate chunk of telegrams + TelegramBuffer m_telegramBuff; bool ensureConnected(); diff --git a/src/InteractivePathAnalysis/client/telegrambuffer.cpp b/src/InteractivePathAnalysis/client/telegrambuffer.cpp new file mode 100644 index 000000000..83597683a --- /dev/null +++ b/src/InteractivePathAnalysis/client/telegrambuffer.cpp @@ -0,0 +1,25 @@ +#include "telegrambuffer.h" +#include "keys.h" + +void TelegramBuffer::append(const ByteArray& bytes) +{ + m_rawBuffer.append(bytes); +} + +std::vector TelegramBuffer::takeFrames() +{ + std::vector result; + ByteArray candidate; + for (unsigned char b: m_rawBuffer.data()) { + if (b == TELEGRAM_FRAME_DELIMETER) { + if (!candidate.empty()) { + result.push_back(candidate); + } + candidate = ByteArray(); + } else { + candidate.append(b); + } + } + std::swap(m_rawBuffer, candidate); + return result; +} diff --git a/src/InteractivePathAnalysis/client/telegrambuffer.h b/src/InteractivePathAnalysis/client/telegrambuffer.h new file mode 100644 index 000000000..c86137999 --- /dev/null +++ b/src/InteractivePathAnalysis/client/telegrambuffer.h @@ -0,0 +1,57 @@ +#pragma once + +#include +#include +#include + +class ByteArray { +public: + static const std::size_t DEFAULT_SIZE_HINT = 1024; + + ByteArray(const char* data) + : m_data(reinterpret_cast(data), + reinterpret_cast(data + strlen(data))) + {} + ByteArray(std::size_t sizeHint = DEFAULT_SIZE_HINT) { + m_data.reserve(sizeHint); + } + + void append(const ByteArray& appendix) { + for (unsigned char b: appendix.data()) { + m_data.push_back(b); + } + } + + void append(unsigned char b) { + m_data.push_back(b); + } + + std::string to_string() const { + return std::string(reinterpret_cast(m_data.data()), m_data.size()); + } + + bool empty() const { return m_data.empty(); } + const std::vector& data() const { return m_data; } + void clear() { m_data.clear(); } + +private: + std::vector m_data; +}; + +class TelegramBuffer +{ + static const std::size_t DEFAULT_SIZE_HINT = 1024; +public: + TelegramBuffer(std::size_t sizeHint = DEFAULT_SIZE_HINT): m_rawBuffer(sizeHint) {} + ~TelegramBuffer()=default; + + void clear() { m_rawBuffer.clear(); } + + void append(const ByteArray&); + std::vector takeFrames(); + + const ByteArray& data() const { return m_rawBuffer; } + +private: + ByteArray m_rawBuffer; +}; diff --git a/src/Main/CMakeLists.txt b/src/Main/CMakeLists.txt index f2fa5069a..08c46a1f3 100644 --- a/src/Main/CMakeLists.txt +++ b/src/Main/CMakeLists.txt @@ -88,6 +88,7 @@ set (SRC_CPP_LIST ../Main/Foedag_ql.cpp ../InteractivePathAnalysis/client/client.cpp ../InteractivePathAnalysis/client/process.cpp ../InteractivePathAnalysis/client/tcpsocket.cpp + ../InteractivePathAnalysis/client/telegrambuffer.cpp ../InteractivePathAnalysis/client/requestcreator.cpp ) @@ -138,6 +139,7 @@ set (SRC_H_LIST ../Main/Foedag.h ../InteractivePathAnalysis/client/process.h ../InteractivePathAnalysis/client/keys.h ../InteractivePathAnalysis/client/tcpsocket.h + ../InteractivePathAnalysis/client/telegrambuffer.h ../InteractivePathAnalysis/client/requestcreator.h )