From fff4100b4339d432b4055f7bfc5a238fd0ef0783 Mon Sep 17 00:00:00 2001 From: German Laullon Date: Tue, 26 Sep 2023 09:17:53 +0200 Subject: [PATCH] [MONIT-40972 ] lz4-pure-java (#873) --- proxy/pom.xml | 10 ++------- .../AbstractReportableEntityHandler.java | 1 + .../agent/queueing/RetryTaskConverter.java | 22 ++++++++++++------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/proxy/pom.xml b/proxy/pom.xml index 005432c47..4f5d9a57f 100644 --- a/proxy/pom.xml +++ b/proxy/pom.xml @@ -334,7 +334,7 @@ org.apache.commons commons-compress - 1.21 + 1.24.0 org.apache.thrift @@ -535,12 +535,6 @@ 1.3.1 compile - - net.jpountz.lz4 - lz4 - 1.3.0 - compile - com.lmax disruptor @@ -684,4 +678,4 @@ - \ No newline at end of file + diff --git a/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java b/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java index 7104384f9..b3d0cb49c 100644 --- a/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java @@ -197,6 +197,7 @@ public void block(@Nullable T item, @Nullable String message) { @Override public void report(T item) { try { + attemptedCounter.inc(); reportInternal(item); } catch (IllegalArgumentException e) { this.reject(item, e.getMessage() + " (" + serializer.apply(item) + ")"); diff --git a/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java b/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java index bba5ace42..b7b2fa3f7 100644 --- a/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java +++ b/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java @@ -18,8 +18,8 @@ import java.util.zip.GZIPOutputStream; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import net.jpountz.lz4.LZ4BlockInputStream; -import net.jpountz.lz4.LZ4BlockOutputStream; +import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream; import org.apache.commons.io.IOUtils; /** @@ -33,10 +33,11 @@ public class RetryTaskConverter> implements Task Logger.getLogger(RetryTaskConverter.class.getCanonicalName()); static final byte[] TASK_HEADER = new byte[] {'W', 'F'}; - static final byte FORMAT_RAW = 1; // 'W' 'F' 0x01 0x01 - static final byte FORMAT_GZIP = 2; // 'W' 'F' 0x01 0x02 - static final byte FORMAT_LZ4 = 3; // 'W' 'F' 0x01 0x03 - static final byte WRAPPED = 4; // 'W' 'F' 0x06 0x04 0x01 + static final byte FORMAT_RAW = 1; + static final byte FORMAT_GZIP = 2; + static final byte FORMAT_LZ4_OLD = 3; + static final byte WRAPPED = 4; + static final byte FORMAT_LZ4 = 5; static final byte[] PREFIX = {'W', 'F', 6, 4}; private final ObjectMapper objectMapper = @@ -71,8 +72,12 @@ public T fromBytes(@Nonnull byte[] bytes) { byte compression = header[0] == WRAPPED && bytesToRead > 1 ? header[1] : header[0]; try { switch (compression) { + case FORMAT_LZ4_OLD: + input.skip(21); // old lz4 header, not need with the apache commons implementation + stream = new BlockLZ4CompressorInputStream(input); + break; case FORMAT_LZ4: - stream = new LZ4BlockInputStream(input); + stream = new BlockLZ4CompressorInputStream(input); break; case FORMAT_GZIP: stream = new GZIPInputStream(input); @@ -116,7 +121,8 @@ public void serializeToStream(@Nonnull T t, @Nonnull OutputStream bytes) throws case LZ4: bytes.write(FORMAT_LZ4); bytes.write(ByteBuffer.allocate(4).putInt(t.weight()).array()); - LZ4BlockOutputStream lz4BlockOutputStream = new LZ4BlockOutputStream(bytes); + BlockLZ4CompressorOutputStream lz4BlockOutputStream = + new BlockLZ4CompressorOutputStream(bytes); objectMapper.writeValue(lz4BlockOutputStream, t); lz4BlockOutputStream.close(); return;