Skip to content

Commit

Permalink
[MONIT-40972 ] lz4-pure-java (#873)
Browse files Browse the repository at this point in the history
  • Loading branch information
laullon authored Sep 26, 2023
1 parent 6180330 commit fff4100
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
10 changes: 2 additions & 8 deletions proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
<version>1.24.0</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
Expand Down Expand Up @@ -535,12 +535,6 @@
<version>1.3.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
Expand Down Expand Up @@ -684,4 +678,4 @@
</profile>
</profiles>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void block(@Nullable T item, @Nullable String message) {
@Override
public void report(T item) {
try {
attemptedCounter.inc();
reportInternal(item);
} catch (IllegalArgumentException e) {
this.reject(item, e.getMessage() + " (" + serializer.apply(item) + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
import org.apache.commons.io.IOUtils;

/**
Expand All @@ -33,10 +33,11 @@ public class RetryTaskConverter<T extends DataSubmissionTask<T>> implements Task
Logger.getLogger(RetryTaskConverter.class.getCanonicalName());

static final byte[] TASK_HEADER = new byte[] {'W', 'F'};
static final byte FORMAT_RAW = 1; // 'W' 'F' 0x01 0x01 <payload>
static final byte FORMAT_GZIP = 2; // 'W' 'F' 0x01 0x02 <payload>
static final byte FORMAT_LZ4 = 3; // 'W' 'F' 0x01 0x03 <payload>
static final byte WRAPPED = 4; // 'W' 'F' 0x06 0x04 0x01 <weight> <payload>
static final byte FORMAT_RAW = 1;
static final byte FORMAT_GZIP = 2;
static final byte FORMAT_LZ4_OLD = 3;
static final byte WRAPPED = 4;
static final byte FORMAT_LZ4 = 5;
static final byte[] PREFIX = {'W', 'F', 6, 4};

private final ObjectMapper objectMapper =
Expand Down Expand Up @@ -71,8 +72,12 @@ public T fromBytes(@Nonnull byte[] bytes) {
byte compression = header[0] == WRAPPED && bytesToRead > 1 ? header[1] : header[0];
try {
switch (compression) {
case FORMAT_LZ4_OLD:
input.skip(21); // old lz4 header, not need with the apache commons implementation
stream = new BlockLZ4CompressorInputStream(input);
break;
case FORMAT_LZ4:
stream = new LZ4BlockInputStream(input);
stream = new BlockLZ4CompressorInputStream(input);
break;
case FORMAT_GZIP:
stream = new GZIPInputStream(input);
Expand Down Expand Up @@ -116,7 +121,8 @@ public void serializeToStream(@Nonnull T t, @Nonnull OutputStream bytes) throws
case LZ4:
bytes.write(FORMAT_LZ4);
bytes.write(ByteBuffer.allocate(4).putInt(t.weight()).array());
LZ4BlockOutputStream lz4BlockOutputStream = new LZ4BlockOutputStream(bytes);
BlockLZ4CompressorOutputStream lz4BlockOutputStream =
new BlockLZ4CompressorOutputStream(bytes);
objectMapper.writeValue(lz4BlockOutputStream, t);
lz4BlockOutputStream.close();
return;
Expand Down

0 comments on commit fff4100

Please sign in to comment.