Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MONIT-40972 ] lz4-pure-java #873

Merged
merged 10 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@
<artifactId>okhttp</artifactId>
<version>4.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.24.0</version>
</dependency>
locke-chappel marked this conversation as resolved.
Show resolved Hide resolved
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -533,12 +538,6 @@
<version>1.3.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void block(@Nullable T item, @Nullable String message) {
@Override
public void report(T item) {
try {
attemptedCounter.inc();
reportInternal(item);
} catch (IllegalArgumentException e) {
this.reject(item, e.getMessage() + " (" + serializer.apply(item) + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
import org.apache.commons.io.IOUtils;

/**
Expand All @@ -33,10 +33,11 @@ public class RetryTaskConverter<T extends DataSubmissionTask<T>> implements Task
Logger.getLogger(RetryTaskConverter.class.getCanonicalName());

static final byte[] TASK_HEADER = new byte[] {'W', 'F'};
static final byte FORMAT_RAW = 1; // 'W' 'F' 0x01 0x01 <payload>
static final byte FORMAT_GZIP = 2; // 'W' 'F' 0x01 0x02 <payload>
static final byte FORMAT_LZ4 = 3; // 'W' 'F' 0x01 0x03 <payload>
static final byte WRAPPED = 4; // 'W' 'F' 0x06 0x04 0x01 <weight> <payload>
static final byte FORMAT_RAW = 1;
static final byte FORMAT_GZIP = 2;
static final byte FORMAT_LZ4_OLD = 3;
static final byte WRAPPED = 4;
static final byte FORMAT_LZ4 = 5;
static final byte[] PREFIX = {'W', 'F', 6, 4};

private final ObjectMapper objectMapper =
Expand Down Expand Up @@ -71,8 +72,12 @@ public T fromBytes(@Nonnull byte[] bytes) {
byte compression = header[0] == WRAPPED && bytesToRead > 1 ? header[1] : header[0];
try {
switch (compression) {
case FORMAT_LZ4_OLD:
input.skip(21); // old lz4 header, not need with the apache commons implementation
stream = new BlockLZ4CompressorInputStream(input);
break;
case FORMAT_LZ4:
stream = new LZ4BlockInputStream(input);
stream = new BlockLZ4CompressorInputStream(input);
break;
case FORMAT_GZIP:
stream = new GZIPInputStream(input);
Expand Down Expand Up @@ -116,7 +121,8 @@ public void serializeToStream(@Nonnull T t, @Nonnull OutputStream bytes) throws
case LZ4:
bytes.write(FORMAT_LZ4);
bytes.write(ByteBuffer.allocate(4).putInt(t.weight()).array());
LZ4BlockOutputStream lz4BlockOutputStream = new LZ4BlockOutputStream(bytes);
BlockLZ4CompressorOutputStream lz4BlockOutputStream =
new BlockLZ4CompressorOutputStream(bytes);
objectMapper.writeValue(lz4BlockOutputStream, t);
lz4BlockOutputStream.close();
return;
Expand Down
Loading