Skip to content

Commit

Permalink
LZ4 Backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
laullon committed Sep 20, 2023
1 parent 8acdb11 commit ea7c4bc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public void block(@Nullable T item, @Nullable String message) {
@Override
public void report(T item) {
try {
attemptedCounter.inc();
reportInternal(item);
} catch (IllegalArgumentException e) {
this.reject(item, e.getMessage() + " (" + serializer.apply(item) + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import com.wavefront.common.TaggedMetricName;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
import org.apache.commons.io.IOUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -16,14 +22,6 @@
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
import org.apache.commons.io.IOUtils;

/**
* A serializer + deserializer of {@link DataSubmissionTask} objects for storage.
Expand All @@ -36,10 +34,11 @@ public class RetryTaskConverter<T extends DataSubmissionTask<T>> implements Task
Logger.getLogger(RetryTaskConverter.class.getCanonicalName());

static final byte[] TASK_HEADER = new byte[] {'W', 'F'};
static final byte FORMAT_RAW = 1; // 'W' 'F' 0x01 0x01 <payload>
static final byte FORMAT_GZIP = 2; // 'W' 'F' 0x01 0x02 <payload>
static final byte FORMAT_LZ4 = 3; // 'W' 'F' 0x01 0x03 <payload>
static final byte WRAPPED = 4; // 'W' 'F' 0x06 0x04 0x01 <weight> <payload>
static final byte FORMAT_RAW = 1;
static final byte FORMAT_GZIP = 2;
static final byte FORMAT_LZ4_OLD = 3;
static final byte WRAPPED = 4;
static final byte FORMAT_LZ4 = 5;
static final byte[] PREFIX = {'W', 'F', 6, 4};

private final ObjectMapper objectMapper =
Expand Down Expand Up @@ -74,6 +73,10 @@ public T fromBytes(@Nonnull byte[] bytes) {
byte compression = header[0] == WRAPPED && bytesToRead > 1 ? header[1] : header[0];
try {
switch (compression) {
case FORMAT_LZ4_OLD:
input.skip(21); // old lz4 header, not need with the apache commons implementation
stream = new BlockLZ4CompressorInputStream(input);
break;
case FORMAT_LZ4:
stream = new BlockLZ4CompressorInputStream(input);
break;
Expand Down

0 comments on commit ea7c4bc

Please sign in to comment.