From eab8395a378a6ef9b0e715a3bb0b6895843e1dd7 Mon Sep 17 00:00:00 2001 From: gaojieliu Date: Fri, 31 Jan 2020 10:32:16 -0800 Subject: [PATCH] Added a new method to return buffered binary encoder (#19) * Added a new method to return buffered binary encoder In Avro-1.7 and above, the default BinaryEncoder implementation is buffered, which has improved the serialization greatly, but this optimization is absent in Avro-1.4. To improve the serialization performance in Avro-1.4, this code change back-ports the buffered binary encoder implementation from Avro-1.8 to be used in Avro-1.4. By running the test: SerDeMicroBenchmark#testFastAvroSerialization with Avro-1.4, the performance is improved by more than 40% with the new buffered binary encoder (5ms vs 9ms). With the buffered binary encoder, the fast serialization performance across all the Avro versions are very similar now. This code change doesn't change the signature of AvroCompatibilityHelper#newBinaryEncoder, since we would like to keep it backward compatible, instead it adds a new method: AvroCompatibilityHelper#newBufferedBinaryEncoder to return a `Encoder` instance instead of `BinaryEncoder`. The reason of not returning `BinaryEncoder` in the new method is that `BinaryEncoder` has different kinds of contructors across different Avro versions, and it is not easy to backport the buffered binary encoder to work with BinaryEncoder interface in Avro-1.4. * Fixed Javadoc errors --- .../benchmark/AvroGenericSerializer.java | 4 +- .../AvroCompatibilityHelper.java | 5 + .../org/apache/avro/io/Avro14Adapter.java | 18 +- .../org/apache/avro/io/Avro17Adapter.java | 10 + .../java/org/apache/avro/io/AvroAdapter.java | 8 + .../avro/io/avro18/Avro18BinaryData.java | 136 +++++++++ .../avro/io/avro18/Avro18BinaryEncoder.java | 98 +++++++ .../avro18/Avro18BufferedBinaryEncoder.java | 242 ++++++++++++++++ .../apache/avro/io/avro18/Avro18Encoder.java | 264 ++++++++++++++++++ 9 files changed, 782 insertions(+), 3 deletions(-) create mode 100644 avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryData.java create mode 100644 avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryEncoder.java create mode 100644 avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BufferedBinaryEncoder.java create mode 100644 avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18Encoder.java diff --git a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java index b1fed9608..4f51ff962 100644 --- a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java +++ b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java @@ -22,7 +22,7 @@ protected AvroGenericSerializer(DatumWriter datumWriter) { public byte[] serialize(K object) throws Exception { ByteArrayOutputStream output = new ByteArrayOutputStream(); - Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(output); + Encoder encoder = AvroCompatibilityHelper.newBufferedBinaryEncoder(output); try { datumWriter.write(object, encoder); encoder.flush(); @@ -45,7 +45,7 @@ public byte[] serializeObjects(Iterable objects) throws Exception { } private byte[] serializeObjects(Iterable objects, ByteArrayOutputStream output) throws Exception { - Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(output); + Encoder encoder = AvroCompatibilityHelper.newBufferedBinaryEncoder(output); try { objects.forEach(object -> { try { diff --git a/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java b/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java index 0c9511a2c..0d14ca8eb 100644 --- a/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java +++ b/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java @@ -61,6 +61,11 @@ public static BinaryEncoder newBinaryEncoder(OutputStream out) { return FACTORY.newBinaryEncoder(out); } + + public static Encoder newBufferedBinaryEncoder(OutputStream out) { + return FACTORY.newBufferedBinaryEncoder(out); + } + /** * to be migrated to SpecificData.getEncoder() in avro 1.8+ * @param out object output diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java index 68981efd7..7e6d9d8ea 100644 --- a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java @@ -26,6 +26,7 @@ import org.apache.avro.Avro14SchemaAccessHelper; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.avro18.Avro18BufferedBinaryEncoder; import org.apache.avro.io.parsing.ResolvingGrammarGenerator; import org.apache.avro.specific.SpecificDatumReader; import org.codehaus.jackson.JsonNode; @@ -102,6 +103,21 @@ public BinaryEncoder newBinaryEncoder(OutputStream out) { } } + /** + * In Avro-1.4, there is no buffered binary encoder. + * To improve the serialization, we back-port the buffered binary encoder implementation + * as {@link Avro18BufferedBinaryEncoder} from Avro-1.8, with it, the serialization performance will be + * improved a lot. + * Since the back-ported class couldn't extend {@link BinaryEncoder} because of different + * constructors in different Avro versions, so we have to return the {@link Encoder} instead. + * @param out output stream + * @return buffered binary encoder. + */ + @Override + public Encoder newBufferedBinaryEncoder(OutputStream out) { + return new Avro18BufferedBinaryEncoder(out); + } + @Override public GenericData.EnumSymbol newEnumSymbol(Schema avroSchema, String enumValue) { try { @@ -146,7 +162,7 @@ public Object getDefaultValue(Schema.Field field) { // value and then decoding it: if (defaultValue == null) { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - BinaryEncoder encoder = newBinaryEncoder(baos); + Encoder encoder = newBufferedBinaryEncoder(baos); _encodeJsonNode.invoke(null, encoder, field.schema(), json); encoder.flush(); BinaryDecoder decoder = newBinaryDecoder(new ByteArrayInputStream(baos.toByteArray())); diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java index e5ef5aa79..a685f6851 100644 --- a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java @@ -133,6 +133,16 @@ public BinaryEncoder newBinaryEncoder(OutputStream out) { } } + /** + * In Avro-1.7 and above, the default binary encoder returned is buffered. + * @param out output stream + * @return + */ + @Override + public Encoder newBufferedBinaryEncoder(OutputStream out) { + return newBinaryEncoder(out); + } + @Override public GenericData.EnumSymbol newEnumSymbol(Schema avroSchema, String enumValue) { try { diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java b/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java index be8c0b3e0..b5526158c 100644 --- a/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java @@ -30,6 +30,14 @@ public interface AvroAdapter { BinaryEncoder newBinaryEncoder(OutputStream out); + /** + * This method is to explicitly return a buffered binary encoder to improve serialization performance + * specifically for Avro-1.4. + * @param out output stream + * @return buffered binary encoder + */ + Encoder newBufferedBinaryEncoder(OutputStream out); + BinaryDecoder newBinaryDecoder(InputStream in); default JsonEncoder newJsonEncoder(Schema schema, OutputStream out) throws IOException { diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryData.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryData.java new file mode 100644 index 000000000..2c0c39fc1 --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryData.java @@ -0,0 +1,136 @@ +package org.apache.avro.io.avro18; + + +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + * We also removed a bunch of unrelated methods, so that we don't need to back-port Decoder related code here. + */ +public class Avro18BinaryData { + + private Avro18BinaryData() {} // no public ctor + + /** Encode a boolean to the byte array at the given position. Will throw + * IndexOutOfBounds if the position is not valid. + * @return The number of bytes written to the buffer, 1. + */ + public static int encodeBoolean(boolean b, byte[] buf, int pos) { + buf[pos] = b ? (byte) 1 : (byte) 0; + return 1; + } + + /** Encode an integer to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 5 bytes left in the buffer before calling this method. + * @return The number of bytes written to the buffer, between 1 and 5. + */ + public static int encodeInt(int n, byte[] buf, int pos) { + // move sign to low-order bit, and flip others if negative + n = (n << 1) ^ (n >> 31); + int start = pos; + if ((n & ~0x7F) != 0) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + } + } + } + } + buf[pos++] = (byte) n; + return pos - start; + } + + /** Encode a long to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 10 bytes left in the buffer before calling this method. + * @return The number of bytes written to the buffer, between 1 and 10. + */ + public static int encodeLong(long n, byte[] buf, int pos) { + // move sign to low-order bit, and flip others if negative + n = (n << 1) ^ (n >> 63); + int start = pos; + if ((n & ~0x7FL) != 0) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + } + } + } + } + } + } + } + } + } + buf[pos++] = (byte) n; + return pos - start; + } + + /** Encode a float to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 4 bytes left in the buffer before calling this method. + * @return Returns the number of bytes written to the buffer, 4. + */ + public static int encodeFloat(float f, byte[] buf, int pos) { + int len = 1; + int bits = Float.floatToRawIntBits(f); + // hotspot compiler works well with this variant + buf[pos] = (byte)((bits ) & 0xFF); + buf[pos + len++] = (byte)((bits >>> 8) & 0xFF); + buf[pos + len++] = (byte)((bits >>> 16) & 0xFF); + buf[pos + len++] = (byte)((bits >>> 24) & 0xFF); + return 4; + } + + /** Encode a double to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 8 bytes left in the buffer before calling this method. + * @return Returns the number of bytes written to the buffer, 8. + */ + public static int encodeDouble(double d, byte[] buf, int pos) { + long bits = Double.doubleToRawLongBits(d); + int first = (int)(bits & 0xFFFFFFFF); + int second = (int)((bits >>> 32) & 0xFFFFFFFF); + // the compiler seems to execute this order the best, likely due to + // register allocation -- the lifetime of constants is minimized. + buf[pos] = (byte)((first ) & 0xFF); + buf[pos + 4] = (byte)((second ) & 0xFF); + buf[pos + 5] = (byte)((second >>> 8) & 0xFF); + buf[pos + 1] = (byte)((first >>> 8) & 0xFF); + buf[pos + 2] = (byte)((first >>> 16) & 0xFF); + buf[pos + 6] = (byte)((second >>> 16) & 0xFF); + buf[pos + 7] = (byte)((second >>> 24) & 0xFF); + buf[pos + 3] = (byte)((first >>> 24) & 0xFF); + return 8; + } +} diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryEncoder.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryEncoder.java new file mode 100644 index 000000000..35125bb3c --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryEncoder.java @@ -0,0 +1,98 @@ +package org.apache.avro.io.avro18; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.avro.util.Utf8; + +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + */ +public abstract class Avro18BinaryEncoder extends Avro18Encoder { + + @Override + public void writeNull() throws IOException {} + + @Override + public void writeString(Utf8 utf8) throws IOException { + this.writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); + } + + @Override + public void writeString(String string) throws IOException { + if (0 == string.length()) { + writeZero(); + return; + } + byte[] bytes = string.getBytes("UTF-8"); + writeInt(bytes.length); + writeFixed(bytes, 0, bytes.length); + } + + @Override + public void writeBytes(ByteBuffer bytes) throws IOException { + int len = bytes.limit() - bytes.position(); + if (0 == len) { + writeZero(); + } else { + writeInt(len); + writeFixed(bytes); + } + } + + @Override + public void writeBytes(byte[] bytes, int start, int len) throws IOException { + if (0 == len) { + writeZero(); + return; + } + this.writeInt(len); + this.writeFixed(bytes, start, len); + } + + @Override + public void writeEnum(int e) throws IOException { + this.writeInt(e); + } + + @Override + public void writeArrayStart() throws IOException {} + + @Override + public void setItemCount(long itemCount) throws IOException { + if (itemCount > 0) { + this.writeLong(itemCount); + } + } + + @Override + public void startItem() throws IOException {} + + @Override + public void writeArrayEnd() throws IOException { + writeZero(); + } + + @Override + public void writeMapStart() throws IOException {} + + @Override + public void writeMapEnd() throws IOException { + writeZero(); + } + + @Override + public void writeIndex(int unionIndex) throws IOException { + writeInt(unionIndex); + } + + /** Write a zero byte to the underlying output. **/ + protected abstract void writeZero() throws IOException; + + /** + * Returns the number of bytes currently buffered by this encoder. If this + * Encoder does not buffer, this will always return zero. + * + * Call {@link #flush()} to empty the buffer to the underlying output. + */ + public abstract int bytesBuffered(); +} diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BufferedBinaryEncoder.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BufferedBinaryEncoder.java new file mode 100644 index 000000000..f7be8895b --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BufferedBinaryEncoder.java @@ -0,0 +1,242 @@ +package org.apache.avro.io.avro18; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; + +import org.apache.avro.AvroRuntimeException; + + +/** + * An {@link Encoder} for Avro's binary encoding. + *

+ * This implementation buffers output to enhance performance. + * Output may not appear on the underlying output until flush() is called. + *

+ * {@literal DirectBinaryEncoder} can be used in place of this implementation if + * the buffering semantics are not desired, and the performance difference + * is acceptable. + *

+ * To construct or reconfigure, use + * {@literal EncoderFactory#binaryEncoder(OutputStream, BinaryEncoder)}. + *

+ * To change the buffer size, configure the factory instance used to + * create instances with {@literal EncoderFactory#configureBufferSize(int)} + * @see Encoder + */ +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + */ +public class Avro18BufferedBinaryEncoder extends Avro18BinaryEncoder { + /** + * Default buffer size defined in {@literal EncoderFactory} of Avro-1.8 + */ + public static final int DEFAULT_BUFFER_SIZE = 2048; + + private byte[] buf; + private int pos; + private ByteSink sink; + private int bulkLimit; + + public Avro18BufferedBinaryEncoder(OutputStream out) { + this(out, DEFAULT_BUFFER_SIZE); + } + + public Avro18BufferedBinaryEncoder(OutputStream out, int bufferSize) { + configure(out, bufferSize); + } + + Avro18BufferedBinaryEncoder configure(OutputStream out, int bufferSize) { + if (null == out) + throw new NullPointerException("OutputStream cannot be null!"); + if (null != this.sink) { + if ( pos > 0) { + try { + flushBuffer(); + } catch (IOException e) { + throw new AvroRuntimeException("Failure flushing old output", e); + } + } + } + this.sink = new OutputStreamSink(out); + pos = 0; + if (null == buf || buf.length != bufferSize) { + buf = new byte[bufferSize]; + } + bulkLimit = buf.length >>> 1; + if (bulkLimit > 512) { + bulkLimit = 512; + } + return this; + } + + public void init(OutputStream out) throws IOException { + flush(); + configure(out, DEFAULT_BUFFER_SIZE); + } + + @Override + public void flush() throws IOException { + flushBuffer(); + sink.innerFlush(); + } + + /** Flushes the internal buffer to the underlying output. + * Does not flush the underlying output. + */ + private void flushBuffer() throws IOException { + if (pos > 0) { + sink.innerWrite(buf, 0, pos); + pos = 0; + } + } + + /** Ensures that the buffer has at least num bytes free to write to between its + * current position and the end. This will not expand the buffer larger than + * its current size, for writes larger than or near to the size of the buffer, + * we flush the buffer and write directly to the output, bypassing the buffer. + * @param num + * @throws IOException + */ + private void ensureBounds(int num) throws IOException { + int remaining = buf.length - pos; + if (remaining < num) { + flushBuffer(); + } + } + + @Override + public void writeBoolean(boolean b) throws IOException { + // inlined, shorter version of ensureBounds + if (buf.length == pos) { + flushBuffer(); + } + pos += Avro18BinaryData.encodeBoolean(b, buf, pos); + } + + @Override + public void writeInt(int n) throws IOException { + ensureBounds(5); + pos += Avro18BinaryData.encodeInt(n, buf, pos); + } + + @Override + public void writeLong(long n) throws IOException { + ensureBounds(10); + pos += Avro18BinaryData.encodeLong(n, buf, pos); + } + + @Override + public void writeFloat(float f) throws IOException { + ensureBounds(4); + pos += Avro18BinaryData.encodeFloat(f, buf, pos); + } + + @Override + public void writeDouble(double d) throws IOException { + ensureBounds(8); + pos += Avro18BinaryData.encodeDouble(d, buf, pos); + } + + @Override + public void writeFixed(byte[] bytes, int start, int len) throws IOException { + if (len > bulkLimit) { + //too big, write direct + flushBuffer(); + sink.innerWrite(bytes, start, len); + return; + } + ensureBounds(len); + System.arraycopy(bytes, start, buf, pos, len); + pos += len; + } + + @Override + public void writeFixed(ByteBuffer bytes) throws IOException { + if (!bytes.hasArray() && bytes.remaining() > bulkLimit) { + flushBuffer(); + sink.innerWrite(bytes); // bypass the buffer + } else { + super.writeFixed(bytes); + } + } + + @Override + protected void writeZero() throws IOException { + writeByte(0); + } + + private void writeByte(int b) throws IOException { + if (pos == buf.length) { + flushBuffer(); + } + buf[pos++] = (byte) (b & 0xFF); + } + + @Override + public int bytesBuffered() { + return pos; + } + + /** + * ByteSink abstracts the destination of written data from the core workings + * of BinaryEncoder. + *

+ * Currently the only destination option is an OutputStream, but we may later + * want to handle other constructs or specialize for certain OutputStream + * Implementations such as ByteBufferOutputStream. + *

+ */ + private abstract static class ByteSink { + protected ByteSink() {} + /** Write data from bytes, starting at off, for len bytes **/ + protected abstract void innerWrite(byte[] bytes, int off, int len) throws IOException; + + protected abstract void innerWrite(ByteBuffer buff) throws IOException; + + /** Flush the underlying output, if supported **/ + protected abstract void innerFlush() throws IOException; + } + + static class OutputStreamSink extends ByteSink { + private final OutputStream out; + private final WritableByteChannel channel; + private OutputStreamSink(OutputStream out) { + super(); + this.out = out; + channel = Channels.newChannel(out); + } + @Override + protected void innerWrite(byte[] bytes, int off, int len) + throws IOException { + out.write(bytes, off, len); + } + @Override + protected void innerFlush() throws IOException { + out.flush(); + } + @Override + protected void innerWrite(ByteBuffer buff) throws IOException { + channel.write(buff); + } + } +} diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18Encoder.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18Encoder.java new file mode 100644 index 000000000..42837e4a1 --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18Encoder.java @@ -0,0 +1,264 @@ +package org.apache.avro.io.avro18; + +import java.io.Flushable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.avro.AvroTypeException; +import org.apache.avro.io.Encoder; +import org.apache.avro.util.Utf8; + +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + */ +public abstract class Avro18Encoder extends Encoder implements Flushable { + + /** + * "Writes" a null value. (Doesn't actually write anything, but + * advances the state of the parser if this class is stateful.) + * @throws AvroTypeException If this is a stateful writer and a + * null is not expected + */ + public abstract void writeNull() throws IOException; + + /** + * Write a boolean value. + * @throws AvroTypeException If this is a stateful writer and a + * boolean is not expected + */ + public abstract void writeBoolean(boolean b) throws IOException; + + /** + * Writes a 32-bit integer. + * @throws AvroTypeException If this is a stateful writer and an + * integer is not expected + */ + public abstract void writeInt(int n) throws IOException; + + /** + * Write a 64-bit integer. + * @throws AvroTypeException If this is a stateful writer and a + * long is not expected + */ + public abstract void writeLong(long n) throws IOException; + + /** Write a float. + * @throws IOException + * @throws AvroTypeException If this is a stateful writer and a + * float is not expected + */ + public abstract void writeFloat(float f) throws IOException; + + /** + * Write a double. + * @throws AvroTypeException If this is a stateful writer and a + * double is not expected + */ + public abstract void writeDouble(double d) throws IOException; + + /** + * Write a Unicode character string. + * @throws AvroTypeException If this is a stateful writer and a + * char-string is not expected + */ + public abstract void writeString(Utf8 utf8) throws IOException; + + /** + * Write a Unicode character string. The default implementation converts + * the String to a {@link org.apache.avro.util.Utf8}. Some Encoder + * implementations may want to do something different as a performance optimization. + * @throws AvroTypeException If this is a stateful writer and a + * char-string is not expected + */ + public void writeString(String str) throws IOException { + writeString(new Utf8(str)); + } + + /** + * Write a Unicode character string. If the CharSequence is an + * {@link org.apache.avro.util.Utf8} it writes this directly, otherwise + * the CharSequence is converted to a String via toString() and written. + * @throws AvroTypeException If this is a stateful writer and a + * char-string is not expected + */ + public void writeString(CharSequence charSequence) throws IOException { + if (charSequence instanceof Utf8) + writeString((Utf8)charSequence); + else + writeString(charSequence.toString()); + } + + /** + * Write a byte string. + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + */ + public abstract void writeBytes(ByteBuffer bytes) throws IOException; + + /** + * Write a byte string. + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + */ + public abstract void writeBytes(byte[] bytes, int start, int len) throws IOException; + + /** + * Writes a byte string. + * Equivalent to writeBytes(bytes, 0, bytes.length) + * @throws IOException + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + */ + public void writeBytes(byte[] bytes) throws IOException { + writeBytes(bytes, 0, bytes.length); + } + + /** + * Writes a fixed size binary object. + * @param bytes The contents to write + * @param start The position within bytes where the contents + * start. + * @param len The number of bytes to write. + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + * @throws IOException + */ + public abstract void writeFixed(byte[] bytes, int start, int len) throws IOException; + + /** + * A shorthand for writeFixed(bytes, 0, bytes.length) + * @param bytes + */ + public void writeFixed(byte[] bytes) throws IOException { + writeFixed(bytes, 0, bytes.length); + } + + /** Writes a fixed from a ByteBuffer. */ + public void writeFixed(ByteBuffer bytes) throws IOException { + int pos = bytes.position(); + int len = bytes.limit() - pos; + if (bytes.hasArray()) { + writeFixed(bytes.array(), bytes.arrayOffset() + pos, len); + } else { + byte[] b = new byte[len]; + bytes.duplicate().get(b, 0, len); + writeFixed(b, 0, len); + } + } + + /** + * Writes an enumeration. + * @param e + * @throws AvroTypeException If this is a stateful writer and an enumeration + * is not expected or the e is out of range. + * @throws IOException + */ + public abstract void writeEnum(int e) throws IOException; + + /** Call this method to start writing an array. + * + * When starting to serialize an array, call {@link + * #writeArrayStart}. Then, before writing any data for any item + * call {@link #setItemCount} followed by a sequence of + * {@link #startItem()} and the item itself. The number of + * {@link #startItem()} should match the number specified in + * {@link #setItemCount}. + * When actually writing the data of the item, you can call any {@link + * Encoder} method (e.g., {@link #writeLong}). When all items + * of the array have been written, call {@link #writeArrayEnd}. + * + * As an example, let's say you want to write an array of records, + * the record consisting of an Long field and a Boolean field. + * Your code would look something like this: + *

+   *  out.writeArrayStart();
+   *  out.setItemCount(list.size());
+   *  for (Record r : list) {
+   *    out.startItem();
+   *    out.writeLong(r.longField);
+   *    out.writeBoolean(r.boolField);
+   *  }
+   *  out.writeArrayEnd();
+   *  
+ * @throws AvroTypeException If this is a stateful writer and an + * array is not expected + */ + public abstract void writeArrayStart() throws IOException; + + /** + * Call this method before writing a batch of items in an array or a map. + * Then for each item, call {@link #startItem()} followed by any of the + * other write methods of {@link Encoder}. The number of calls + * to {@link #startItem()} must be equal to the count specified + * in {@link #setItemCount}. Once a batch is completed you + * can start another batch with {@link #setItemCount}. + * + * @param itemCount The number of {@link #startItem()} calls to follow. + * @throws IOException + */ + public abstract void setItemCount(long itemCount) throws IOException; + + /** + * Start a new item of an array or map. + * See {@link #writeArrayStart} for usage information. + * @throws AvroTypeException If called outside of an array or map context + */ + public abstract void startItem() throws IOException; + + /** + * Call this method to finish writing an array. + * See {@link #writeArrayStart} for usage information. + * + * @throws AvroTypeException If items written does not match count + * provided to {@link #writeArrayStart} + * @throws AvroTypeException If not currently inside an array + */ + public abstract void writeArrayEnd() throws IOException; + + /** + * Call this to start a new map. See + * {@link #writeArrayStart} for details on usage. + * + * As an example of usage, let's say you want to write a map of + * records, the record consisting of an Long field and a Boolean + * field. Your code would look something like this: + *
+   * out.writeMapStart();
+   * out.setItemCount(list.size());
+   * for (Map.Entry<String,Record> entry : map.entrySet()) {
+   *   out.startItem();
+   *   out.writeString(entry.getKey());
+   *   out.writeLong(entry.getValue().longField);
+   *   out.writeBoolean(entry.getValue().boolField);
+   * }
+   * out.writeMapEnd();
+   * 
+ * @throws AvroTypeException If this is a stateful writer and a + * map is not expected + */ + public abstract void writeMapStart() throws IOException; + + /** + * Call this method to terminate the inner-most, currently-opened + * map. See {@link #writeArrayStart} for more details. + * + * @throws AvroTypeException If items written does not match count + * provided to {@link #writeMapStart} + * @throws AvroTypeException If not currently inside a map + */ + public abstract void writeMapEnd() throws IOException; + + /** Call this method to write the tag of a union. + * + * As an example of usage, let's say you want to write a union, + * whose second branch is a record consisting of an Long field and + * a Boolean field. Your code would look something like this: + *
+   * out.writeIndex(1);
+   * out.writeLong(record.longField);
+   * out.writeBoolean(record.boolField);
+   * 
+ * @throws AvroTypeException If this is a stateful writer and a + * map is not expected + */ + public abstract void writeIndex(int unionIndex) throws IOException; +}