Skip to content

Commit

Permalink
Added a new method to return buffered binary encoder (#19)
Browse files Browse the repository at this point in the history
* Added a new method to return buffered binary encoder

In Avro-1.7 and above, the default BinaryEncoder implementation is buffered,
which has improved the serialization greatly, but this optimization is
absent in Avro-1.4.
To improve the serialization performance in Avro-1.4, this code change back-ports
the buffered binary encoder implementation from Avro-1.8 to be used
in Avro-1.4.
By running the test: SerDeMicroBenchmark#testFastAvroSerialization with Avro-1.4,
the performance is improved by more than 40% with the new buffered binary encoder (5ms vs 9ms).
With the buffered binary encoder, the fast serialization performance across
all the Avro versions are very similar now.

This code change doesn't change the signature of AvroCompatibilityHelper#newBinaryEncoder,
since we would like to keep it backward compatible, instead it adds a new method:
AvroCompatibilityHelper#newBufferedBinaryEncoder to return a `Encoder` instance instead
of `BinaryEncoder`.
The reason of not returning `BinaryEncoder` in the new method is that `BinaryEncoder` has
different kinds of contructors across different Avro versions, and it is not easy to backport
the buffered binary encoder to work with BinaryEncoder interface in Avro-1.4.

* Fixed Javadoc errors
  • Loading branch information
gaojieliu authored Jan 31, 2020
1 parent c95786f commit eab8395
Show file tree
Hide file tree
Showing 9 changed files with 782 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected AvroGenericSerializer(DatumWriter datumWriter) {

public byte[] serialize(K object) throws Exception {
ByteArrayOutputStream output = new ByteArrayOutputStream();
Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(output);
Encoder encoder = AvroCompatibilityHelper.newBufferedBinaryEncoder(output);
try {
datumWriter.write(object, encoder);
encoder.flush();
Expand All @@ -45,7 +45,7 @@ public byte[] serializeObjects(Iterable<K> objects) throws Exception {
}

private byte[] serializeObjects(Iterable<K> objects, ByteArrayOutputStream output) throws Exception {
Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(output);
Encoder encoder = AvroCompatibilityHelper.newBufferedBinaryEncoder(output);
try {
objects.forEach(object -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public static BinaryEncoder newBinaryEncoder(OutputStream out) {
return FACTORY.newBinaryEncoder(out);
}


public static Encoder newBufferedBinaryEncoder(OutputStream out) {
return FACTORY.newBufferedBinaryEncoder(out);
}

/**
* to be migrated to SpecificData.getEncoder() in avro 1.8+
* @param out object output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.avro.Avro14SchemaAccessHelper;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.avro18.Avro18BufferedBinaryEncoder;
import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
import org.apache.avro.specific.SpecificDatumReader;
import org.codehaus.jackson.JsonNode;
Expand Down Expand Up @@ -102,6 +103,21 @@ public BinaryEncoder newBinaryEncoder(OutputStream out) {
}
}

/**
* In Avro-1.4, there is no buffered binary encoder.
* To improve the serialization, we back-port the buffered binary encoder implementation
* as {@link Avro18BufferedBinaryEncoder} from Avro-1.8, with it, the serialization performance will be
* improved a lot.
* Since the back-ported class couldn't extend {@link BinaryEncoder} because of different
* constructors in different Avro versions, so we have to return the {@link Encoder} instead.
* @param out output stream
* @return buffered binary encoder.
*/
@Override
public Encoder newBufferedBinaryEncoder(OutputStream out) {
return new Avro18BufferedBinaryEncoder(out);
}

@Override
public GenericData.EnumSymbol newEnumSymbol(Schema avroSchema, String enumValue) {
try {
Expand Down Expand Up @@ -146,7 +162,7 @@ public Object getDefaultValue(Schema.Field field) {
// value and then decoding it:
if (defaultValue == null) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
BinaryEncoder encoder = newBinaryEncoder(baos);
Encoder encoder = newBufferedBinaryEncoder(baos);
_encodeJsonNode.invoke(null, encoder, field.schema(), json);
encoder.flush();
BinaryDecoder decoder = newBinaryDecoder(new ByteArrayInputStream(baos.toByteArray()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ public BinaryEncoder newBinaryEncoder(OutputStream out) {
}
}

/**
* In Avro-1.7 and above, the default binary encoder returned is buffered.
* @param out output stream
* @return
*/
@Override
public Encoder newBufferedBinaryEncoder(OutputStream out) {
return newBinaryEncoder(out);
}

@Override
public GenericData.EnumSymbol newEnumSymbol(Schema avroSchema, String enumValue) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public interface AvroAdapter {

BinaryEncoder newBinaryEncoder(OutputStream out);

/**
* This method is to explicitly return a buffered binary encoder to improve serialization performance
* specifically for Avro-1.4.
* @param out output stream
* @return buffered binary encoder
*/
Encoder newBufferedBinaryEncoder(OutputStream out);

BinaryDecoder newBinaryDecoder(InputStream in);

default JsonEncoder newJsonEncoder(Schema schema, OutputStream out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package org.apache.avro.io.avro18;


/**
* Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance.
* We also removed a bunch of unrelated methods, so that we don't need to back-port Decoder related code here.
*/
public class Avro18BinaryData {

private Avro18BinaryData() {} // no public ctor

/** Encode a boolean to the byte array at the given position. Will throw
* IndexOutOfBounds if the position is not valid.
* @return The number of bytes written to the buffer, 1.
*/
public static int encodeBoolean(boolean b, byte[] buf, int pos) {
buf[pos] = b ? (byte) 1 : (byte) 0;
return 1;
}

/** Encode an integer to the byte array at the given position. Will throw
* IndexOutOfBounds if it overflows. Users should ensure that there are at
* least 5 bytes left in the buffer before calling this method.
* @return The number of bytes written to the buffer, between 1 and 5.
*/
public static int encodeInt(int n, byte[] buf, int pos) {
// move sign to low-order bit, and flip others if negative
n = (n << 1) ^ (n >> 31);
int start = pos;
if ((n & ~0x7F) != 0) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
}
}
}
}
buf[pos++] = (byte) n;
return pos - start;
}

/** Encode a long to the byte array at the given position. Will throw
* IndexOutOfBounds if it overflows. Users should ensure that there are at
* least 10 bytes left in the buffer before calling this method.
* @return The number of bytes written to the buffer, between 1 and 10.
*/
public static int encodeLong(long n, byte[] buf, int pos) {
// move sign to low-order bit, and flip others if negative
n = (n << 1) ^ (n >> 63);
int start = pos;
if ((n & ~0x7FL) != 0) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
if (n > 0x7F) {
buf[pos++] = (byte)((n | 0x80) & 0xFF);
n >>>= 7;
}
}
}
}
}
}
}
}
}
buf[pos++] = (byte) n;
return pos - start;
}

/** Encode a float to the byte array at the given position. Will throw
* IndexOutOfBounds if it overflows. Users should ensure that there are at
* least 4 bytes left in the buffer before calling this method.
* @return Returns the number of bytes written to the buffer, 4.
*/
public static int encodeFloat(float f, byte[] buf, int pos) {
int len = 1;
int bits = Float.floatToRawIntBits(f);
// hotspot compiler works well with this variant
buf[pos] = (byte)((bits ) & 0xFF);
buf[pos + len++] = (byte)((bits >>> 8) & 0xFF);
buf[pos + len++] = (byte)((bits >>> 16) & 0xFF);
buf[pos + len++] = (byte)((bits >>> 24) & 0xFF);
return 4;
}

/** Encode a double to the byte array at the given position. Will throw
* IndexOutOfBounds if it overflows. Users should ensure that there are at
* least 8 bytes left in the buffer before calling this method.
* @return Returns the number of bytes written to the buffer, 8.
*/
public static int encodeDouble(double d, byte[] buf, int pos) {
long bits = Double.doubleToRawLongBits(d);
int first = (int)(bits & 0xFFFFFFFF);
int second = (int)((bits >>> 32) & 0xFFFFFFFF);
// the compiler seems to execute this order the best, likely due to
// register allocation -- the lifetime of constants is minimized.
buf[pos] = (byte)((first ) & 0xFF);
buf[pos + 4] = (byte)((second ) & 0xFF);
buf[pos + 5] = (byte)((second >>> 8) & 0xFF);
buf[pos + 1] = (byte)((first >>> 8) & 0xFF);
buf[pos + 2] = (byte)((first >>> 16) & 0xFF);
buf[pos + 6] = (byte)((second >>> 16) & 0xFF);
buf[pos + 7] = (byte)((second >>> 24) & 0xFF);
buf[pos + 3] = (byte)((first >>> 24) & 0xFF);
return 8;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.apache.avro.io.avro18;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.avro.util.Utf8;

/**
* Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance.
*/
public abstract class Avro18BinaryEncoder extends Avro18Encoder {

@Override
public void writeNull() throws IOException {}

@Override
public void writeString(Utf8 utf8) throws IOException {
this.writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
}

@Override
public void writeString(String string) throws IOException {
if (0 == string.length()) {
writeZero();
return;
}
byte[] bytes = string.getBytes("UTF-8");
writeInt(bytes.length);
writeFixed(bytes, 0, bytes.length);
}

@Override
public void writeBytes(ByteBuffer bytes) throws IOException {
int len = bytes.limit() - bytes.position();
if (0 == len) {
writeZero();
} else {
writeInt(len);
writeFixed(bytes);
}
}

@Override
public void writeBytes(byte[] bytes, int start, int len) throws IOException {
if (0 == len) {
writeZero();
return;
}
this.writeInt(len);
this.writeFixed(bytes, start, len);
}

@Override
public void writeEnum(int e) throws IOException {
this.writeInt(e);
}

@Override
public void writeArrayStart() throws IOException {}

@Override
public void setItemCount(long itemCount) throws IOException {
if (itemCount > 0) {
this.writeLong(itemCount);
}
}

@Override
public void startItem() throws IOException {}

@Override
public void writeArrayEnd() throws IOException {
writeZero();
}

@Override
public void writeMapStart() throws IOException {}

@Override
public void writeMapEnd() throws IOException {
writeZero();
}

@Override
public void writeIndex(int unionIndex) throws IOException {
writeInt(unionIndex);
}

/** Write a zero byte to the underlying output. **/
protected abstract void writeZero() throws IOException;

/**
* Returns the number of bytes currently buffered by this encoder. If this
* Encoder does not buffer, this will always return zero.
*
* Call {@link #flush()} to empty the buffer to the underlying output.
*/
public abstract int bytesBuffered();
}
Loading

0 comments on commit eab8395

Please sign in to comment.