From f7583214c0544db533f74e58876aa4b375b4ddfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Anton=C3=ADn=20=C5=A0tigler?= Date: Thu, 28 Mar 2024 13:25:07 +0100 Subject: [PATCH] LZ4 - add special magic number For better defferentiation between normal and compressed traffic. --- output/ipfix.cpp | 35 +++++++++++++---------------------- output/ipfix.hpp | 18 ++++++++++++------ 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/output/ipfix.cpp b/output/ipfix.cpp index af6c6784..ca71db4a 100644 --- a/output/ipfix.cpp +++ b/output/ipfix.cpp @@ -814,12 +814,6 @@ void IPFIXExporter::send_data() * Loop ends when len = create_data_packet() is 0 */ while (true) { - // reset for every new connection - // (reset is faster when requested before getWriteBuffer()) - if (sequenceNum == 0) { - packetDataBuffer.requestReset(); - } - pkt.data = packetDataBuffer.getWriteBuffer(mtu); if (!pkt.data) { // this should never happen because packetDataBuffer @@ -1163,7 +1157,7 @@ int IPFIXExporter::reconnect() // compress buffer implementation CompressBuffer::CompressBuffer() : - shouldCompress(false), shouldReset(false), uncompressed(nullptr), + shouldCompress(false), shouldResetConnection(true), uncompressed(nullptr), uncompressedSize(0), compressed(nullptr), compressedSize(0), readIndex(0), readSize(0), lastReadIndex(0), lastReadSize(0), lz4Stream(nullptr) {} @@ -1196,7 +1190,7 @@ int CompressBuffer::init(bool compress, size_t compressSize, size_t writeSize) return -1; } - shouldReset = true; + shouldResetConnection = true; return 0; } @@ -1213,10 +1207,8 @@ uint8_t *CompressBuffer::getWriteBuffer(size_t requiredSize) { if (readIndex != 0 && readSize + requiredSize <= uncompressedSize) { if (readSize != 0) { - // move the written data in the buffer to the start of the buffer - memmove(uncompressed, uncompressed + readIndex, sizeof(uint8_t) * readSize); - // the old data is definitely invalid - shouldReset = true; + // getWriteBuffer was called multiple times and it is a problem + return nullptr; } // if readSize is 0, this just wraps the circular buffer to the begining @@ -1236,7 +1228,7 @@ uint8_t *CompressBuffer::getWriteBuffer(size_t requiredSize) { // reset the stream if the data is not on the same position if (shouldCompress && newPtr != uncompressed) { - requestReset(); + requestConnectionReset(); } uncompressed = reinterpret_cast(newPtr); @@ -1286,7 +1278,7 @@ int CompressBuffer::compress() { auto com = compressed; auto comSize = compressedSize; - if (shouldReset) { + if (shouldResetConnection) { // when reset, the buffer must start at 0 if (readIndex != 0) { memmove(uncompressed, uncompressed + readIndex, readSize); @@ -1296,8 +1288,8 @@ int CompressBuffer::compress() { // fill the info about new stream - // set the 0 to tell the reciever that this is new stream - *reinterpret_cast(com) = 0; + // set the magic number + *reinterpret_cast(com) = ntohl(LZ4_MAGIC); com += 4; comSize -= 4; @@ -1307,7 +1299,7 @@ int CompressBuffer::compress() { htonl(uncompressedSize + compressedSize); com += sizeof(ipfix_start_compress_header_t); comSize -= sizeof(ipfix_start_compress_header_t); - shouldReset = false; + shouldResetConnection = false; } // set the info about the current block @@ -1328,7 +1320,6 @@ int CompressBuffer::compress() { ); if (res == 0) { - requestReset(); return -1; } @@ -1352,7 +1343,7 @@ uint8_t *CompressBuffer::reviveLast() { readIndex = lastReadIndex; if (shouldCompress) { - requestReset(); + requestConnectionReset(); } return uncompressed + readIndex; @@ -1362,7 +1353,7 @@ void CompressBuffer::shrinkTo(size_t size) { readSize = std::min(readSize, size); } -void CompressBuffer::requestReset() { +void CompressBuffer::requestConnectionReset() { if (!shouldCompress) { return; } @@ -1371,7 +1362,7 @@ void CompressBuffer::requestReset() { if (readSize == 0) { readIndex = 0; } - shouldReset = true; + shouldResetConnection = true; } void CompressBuffer::close() { @@ -1401,7 +1392,7 @@ void CompressBuffer::close() { lz4Stream = nullptr; } - shouldReset = false; + shouldResetConnection = false; shouldCompress = false; readIndex = 0; lastReadIndex = 0; diff --git a/output/ipfix.hpp b/output/ipfix.hpp index 3933f1d2..3070b09d 100644 --- a/output/ipfix.hpp +++ b/output/ipfix.hpp @@ -308,7 +308,7 @@ class CompressBuffer { // is valid only if no data has been written to the end of the block that // will be discarded by this call. // - // The compression can be reset using requestReset(), this will make it so + // The compression can be reset using requestConnectionReset(), this will make it so // that when decompressing you don't need the previous blocks, but the // compression will be less effective. // @@ -320,9 +320,9 @@ class CompressBuffer { // Ideal workflow: // // 1. init the buffer with init() - // 2. reset if needed with requestReset() + // 2. reset if needed with requestConnectionReset() // 3. write to the buffer with getWriteBuffer() - // ideally call getWriteBuffer() only once + // you can call getWriteBuffer() only once // 4. shrink the data if needed with shrinkTo() // 5. get the written data with compress() and getCompressed() // 6. try to avoid calling reviveLast() @@ -413,7 +413,7 @@ class CompressBuffer { * @brief requests that the compression is reset * */ - void requestReset(); + void requestConnectionReset(); /** * @brief frees all allocated memory @@ -424,14 +424,20 @@ class CompressBuffer { // the maximum aditional size required to send metadata that are needed to // to decompress the data, the +4 is there for four 0 bytes that identify // ipfix_start_compress_header_t - static constexpr size_t C_ADD_SIZE = sizeof(ipfix_compress_header_t) + sizeof(ipfix_start_compress_header_t) + 4; + static constexpr size_t C_ADD_SIZE = + sizeof(ipfix_compress_header_t) + + sizeof(ipfix_start_compress_header_t) + + sizeof(uint32_t) * 2; + + // LZ4c + static constexpr uint32_t LZ4_MAGIC = 0x4c5a3463; private: // false if the buffer is in non-compression mode bool shouldCompress; // true if the lz4Stream should be reset - bool shouldReset; + bool shouldResetConnection; // the buffer with uncompressed data uint8_t *uncompressed;