Skip to content

Commit

Permalink
LZ4 - add special magic number
Browse files Browse the repository at this point in the history
For better defferentiation between normal and compressed traffic.
  • Loading branch information
BonnyAD9 authored and SiskaPavel committed Aug 23, 2024
1 parent 43b80c8 commit f758321
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
35 changes: 13 additions & 22 deletions output/ipfix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,6 @@ void IPFIXExporter::send_data()
* Loop ends when len = create_data_packet() is 0
*/
while (true) {
// reset for every new connection
// (reset is faster when requested before getWriteBuffer())
if (sequenceNum == 0) {
packetDataBuffer.requestReset();
}

pkt.data = packetDataBuffer.getWriteBuffer(mtu);
if (!pkt.data) {
// this should never happen because packetDataBuffer
Expand Down Expand Up @@ -1163,7 +1157,7 @@ int IPFIXExporter::reconnect()
// compress buffer implementation

CompressBuffer::CompressBuffer() :
shouldCompress(false), shouldReset(false), uncompressed(nullptr),
shouldCompress(false), shouldResetConnection(true), uncompressed(nullptr),
uncompressedSize(0), compressed(nullptr), compressedSize(0), readIndex(0),
readSize(0), lastReadIndex(0), lastReadSize(0), lz4Stream(nullptr) {}

Expand Down Expand Up @@ -1196,7 +1190,7 @@ int CompressBuffer::init(bool compress, size_t compressSize, size_t writeSize)
return -1;
}

shouldReset = true;
shouldResetConnection = true;

return 0;
}
Expand All @@ -1213,10 +1207,8 @@ uint8_t *CompressBuffer::getWriteBuffer(size_t requiredSize) {

if (readIndex != 0 && readSize + requiredSize <= uncompressedSize) {
if (readSize != 0) {
// move the written data in the buffer to the start of the buffer
memmove(uncompressed, uncompressed + readIndex, sizeof(uint8_t) * readSize);
// the old data is definitely invalid
shouldReset = true;
// getWriteBuffer was called multiple times and it is a problem
return nullptr;
}

// if readSize is 0, this just wraps the circular buffer to the begining
Expand All @@ -1236,7 +1228,7 @@ uint8_t *CompressBuffer::getWriteBuffer(size_t requiredSize) {

// reset the stream if the data is not on the same position
if (shouldCompress && newPtr != uncompressed) {
requestReset();
requestConnectionReset();
}

uncompressed = reinterpret_cast<uint8_t *>(newPtr);
Expand Down Expand Up @@ -1286,7 +1278,7 @@ int CompressBuffer::compress() {
auto com = compressed;
auto comSize = compressedSize;

if (shouldReset) {
if (shouldResetConnection) {
// when reset, the buffer must start at 0
if (readIndex != 0) {
memmove(uncompressed, uncompressed + readIndex, readSize);
Expand All @@ -1296,8 +1288,8 @@ int CompressBuffer::compress() {

// fill the info about new stream

// set the 0 to tell the reciever that this is new stream
*reinterpret_cast<uint32_t *>(com) = 0;
// set the magic number
*reinterpret_cast<uint32_t *>(com) = ntohl(LZ4_MAGIC);
com += 4;
comSize -= 4;

Expand All @@ -1307,7 +1299,7 @@ int CompressBuffer::compress() {
htonl(uncompressedSize + compressedSize);
com += sizeof(ipfix_start_compress_header_t);
comSize -= sizeof(ipfix_start_compress_header_t);
shouldReset = false;
shouldResetConnection = false;
}

// set the info about the current block
Expand All @@ -1328,7 +1320,6 @@ int CompressBuffer::compress() {
);

if (res == 0) {
requestReset();
return -1;
}

Expand All @@ -1352,7 +1343,7 @@ uint8_t *CompressBuffer::reviveLast() {
readIndex = lastReadIndex;

if (shouldCompress) {
requestReset();
requestConnectionReset();
}

return uncompressed + readIndex;
Expand All @@ -1362,7 +1353,7 @@ void CompressBuffer::shrinkTo(size_t size) {
readSize = std::min(readSize, size);
}

void CompressBuffer::requestReset() {
void CompressBuffer::requestConnectionReset() {
if (!shouldCompress) {
return;
}
Expand All @@ -1371,7 +1362,7 @@ void CompressBuffer::requestReset() {
if (readSize == 0) {
readIndex = 0;
}
shouldReset = true;
shouldResetConnection = true;
}

void CompressBuffer::close() {
Expand Down Expand Up @@ -1401,7 +1392,7 @@ void CompressBuffer::close() {
lz4Stream = nullptr;
}

shouldReset = false;
shouldResetConnection = false;
shouldCompress = false;
readIndex = 0;
lastReadIndex = 0;
Expand Down
18 changes: 12 additions & 6 deletions output/ipfix.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class CompressBuffer {
// is valid only if no data has been written to the end of the block that
// will be discarded by this call.
//
// The compression can be reset using requestReset(), this will make it so
// The compression can be reset using requestConnectionReset(), this will make it so
// that when decompressing you don't need the previous blocks, but the
// compression will be less effective.
//
Expand All @@ -320,9 +320,9 @@ class CompressBuffer {
// Ideal workflow:
//
// 1. init the buffer with init()
// 2. reset if needed with requestReset()
// 2. reset if needed with requestConnectionReset()
// 3. write to the buffer with getWriteBuffer()
// ideally call getWriteBuffer() only once
// you can call getWriteBuffer() only once
// 4. shrink the data if needed with shrinkTo()
// 5. get the written data with compress() and getCompressed()
// 6. try to avoid calling reviveLast()
Expand Down Expand Up @@ -413,7 +413,7 @@ class CompressBuffer {
* @brief requests that the compression is reset
*
*/
void requestReset();
void requestConnectionReset();

/**
* @brief frees all allocated memory
Expand All @@ -424,14 +424,20 @@ class CompressBuffer {
// the maximum aditional size required to send metadata that are needed to
// to decompress the data, the +4 is there for four 0 bytes that identify
// ipfix_start_compress_header_t
static constexpr size_t C_ADD_SIZE = sizeof(ipfix_compress_header_t) + sizeof(ipfix_start_compress_header_t) + 4;
static constexpr size_t C_ADD_SIZE =
sizeof(ipfix_compress_header_t)
+ sizeof(ipfix_start_compress_header_t)
+ sizeof(uint32_t) * 2;

// LZ4c
static constexpr uint32_t LZ4_MAGIC = 0x4c5a3463;

private:

// false if the buffer is in non-compression mode
bool shouldCompress;
// true if the lz4Stream should be reset
bool shouldReset;
bool shouldResetConnection;

// the buffer with uncompressed data
uint8_t *uncompressed;
Expand Down

0 comments on commit f758321

Please sign in to comment.