diff --git a/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java b/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java index 4839f5465..885b39894 100644 --- a/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java +++ b/proxy/src/main/java/com/wavefront/agent/handlers/AbstractReportableEntityHandler.java @@ -196,6 +196,7 @@ public void block(@Nullable T item, @Nullable String message) { @Override public void report(T item) { try { + attemptedCounter.inc(); reportInternal(item); } catch (IllegalArgumentException e) { this.reject(item, e.getMessage() + " (" + serializer.apply(item) + ")"); diff --git a/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java b/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java index dcf87932a..8352dbbfa 100644 --- a/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java +++ b/proxy/src/main/java/com/wavefront/agent/queueing/RetryTaskConverter.java @@ -7,6 +7,12 @@ import com.wavefront.common.TaggedMetricName; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; +import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream; +import org.apache.commons.io.IOUtils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -16,14 +22,6 @@ import java.util.logging.Logger; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream; -import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream; -import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; -import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; -import org.apache.commons.io.IOUtils; /** * A serializer + deserializer of {@link DataSubmissionTask} objects for storage. @@ -36,10 +34,11 @@ public class RetryTaskConverter> implements Task Logger.getLogger(RetryTaskConverter.class.getCanonicalName()); static final byte[] TASK_HEADER = new byte[] {'W', 'F'}; - static final byte FORMAT_RAW = 1; // 'W' 'F' 0x01 0x01 - static final byte FORMAT_GZIP = 2; // 'W' 'F' 0x01 0x02 - static final byte FORMAT_LZ4 = 3; // 'W' 'F' 0x01 0x03 - static final byte WRAPPED = 4; // 'W' 'F' 0x06 0x04 0x01 + static final byte FORMAT_RAW = 1; + static final byte FORMAT_GZIP = 2; + static final byte FORMAT_LZ4_OLD = 3; + static final byte WRAPPED = 4; + static final byte FORMAT_LZ4 = 5; static final byte[] PREFIX = {'W', 'F', 6, 4}; private final ObjectMapper objectMapper = @@ -74,6 +73,10 @@ public T fromBytes(@Nonnull byte[] bytes) { byte compression = header[0] == WRAPPED && bytesToRead > 1 ? header[1] : header[0]; try { switch (compression) { + case FORMAT_LZ4_OLD: + input.skip(21); // old lz4 header, not need with the apache commons implementation + stream = new BlockLZ4CompressorInputStream(input); + break; case FORMAT_LZ4: stream = new BlockLZ4CompressorInputStream(input); break;