diff --git a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java index b1fed9608..4f51ff962 100644 --- a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java +++ b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/AvroGenericSerializer.java @@ -22,7 +22,7 @@ protected AvroGenericSerializer(DatumWriter datumWriter) { public byte[] serialize(K object) throws Exception { ByteArrayOutputStream output = new ByteArrayOutputStream(); - Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(output); + Encoder encoder = AvroCompatibilityHelper.newBufferedBinaryEncoder(output); try { datumWriter.write(object, encoder); encoder.flush(); @@ -45,7 +45,7 @@ public byte[] serializeObjects(Iterable objects) throws Exception { } private byte[] serializeObjects(Iterable objects, ByteArrayOutputStream output) throws Exception { - Encoder encoder = AvroCompatibilityHelper.newBinaryEncoder(output); + Encoder encoder = AvroCompatibilityHelper.newBufferedBinaryEncoder(output); try { objects.forEach(object -> { try { diff --git a/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java b/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java index 0c9511a2c..0d14ca8eb 100644 --- a/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java +++ b/avro-migration-helper/src/main/java/com/linkedin/avro/compatibility/AvroCompatibilityHelper.java @@ -61,6 +61,11 @@ public static BinaryEncoder newBinaryEncoder(OutputStream out) { return FACTORY.newBinaryEncoder(out); } + + public static Encoder newBufferedBinaryEncoder(OutputStream out) { + return FACTORY.newBufferedBinaryEncoder(out); + } + /** * to be migrated to SpecificData.getEncoder() in avro 1.8+ * @param out object output diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java index 68981efd7..7e6d9d8ea 100644 --- a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro14Adapter.java @@ -26,6 +26,7 @@ import org.apache.avro.Avro14SchemaAccessHelper; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.avro18.Avro18BufferedBinaryEncoder; import org.apache.avro.io.parsing.ResolvingGrammarGenerator; import org.apache.avro.specific.SpecificDatumReader; import org.codehaus.jackson.JsonNode; @@ -102,6 +103,21 @@ public BinaryEncoder newBinaryEncoder(OutputStream out) { } } + /** + * In Avro-1.4, there is no buffered binary encoder. + * To improve the serialization, we back-port the buffered binary encoder implementation + * as {@link Avro18BufferedBinaryEncoder} from Avro-1.8, with it, the serialization performance will be + * improved a lot. + * Since the back-ported class couldn't extend {@link BinaryEncoder} because of different + * constructors in different Avro versions, so we have to return the {@link Encoder} instead. + * @param out output stream + * @return buffered binary encoder. + */ + @Override + public Encoder newBufferedBinaryEncoder(OutputStream out) { + return new Avro18BufferedBinaryEncoder(out); + } + @Override public GenericData.EnumSymbol newEnumSymbol(Schema avroSchema, String enumValue) { try { @@ -146,7 +162,7 @@ public Object getDefaultValue(Schema.Field field) { // value and then decoding it: if (defaultValue == null) { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - BinaryEncoder encoder = newBinaryEncoder(baos); + Encoder encoder = newBufferedBinaryEncoder(baos); _encodeJsonNode.invoke(null, encoder, field.schema(), json); encoder.flush(); BinaryDecoder decoder = newBinaryDecoder(new ByteArrayInputStream(baos.toByteArray())); diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java index e5ef5aa79..a685f6851 100644 --- a/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/Avro17Adapter.java @@ -133,6 +133,16 @@ public BinaryEncoder newBinaryEncoder(OutputStream out) { } } + /** + * In Avro-1.7 and above, the default binary encoder returned is buffered. + * @param out output stream + * @return + */ + @Override + public Encoder newBufferedBinaryEncoder(OutputStream out) { + return newBinaryEncoder(out); + } + @Override public GenericData.EnumSymbol newEnumSymbol(Schema avroSchema, String enumValue) { try { diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java b/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java index be8c0b3e0..b5526158c 100644 --- a/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/AvroAdapter.java @@ -30,6 +30,14 @@ public interface AvroAdapter { BinaryEncoder newBinaryEncoder(OutputStream out); + /** + * This method is to explicitly return a buffered binary encoder to improve serialization performance + * specifically for Avro-1.4. + * @param out output stream + * @return buffered binary encoder + */ + Encoder newBufferedBinaryEncoder(OutputStream out); + BinaryDecoder newBinaryDecoder(InputStream in); default JsonEncoder newJsonEncoder(Schema schema, OutputStream out) throws IOException { diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryData.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryData.java new file mode 100644 index 000000000..2c0c39fc1 --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryData.java @@ -0,0 +1,136 @@ +package org.apache.avro.io.avro18; + + +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + * We also removed a bunch of unrelated methods, so that we don't need to back-port Decoder related code here. + */ +public class Avro18BinaryData { + + private Avro18BinaryData() {} // no public ctor + + /** Encode a boolean to the byte array at the given position. Will throw + * IndexOutOfBounds if the position is not valid. + * @return The number of bytes written to the buffer, 1. + */ + public static int encodeBoolean(boolean b, byte[] buf, int pos) { + buf[pos] = b ? (byte) 1 : (byte) 0; + return 1; + } + + /** Encode an integer to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 5 bytes left in the buffer before calling this method. + * @return The number of bytes written to the buffer, between 1 and 5. + */ + public static int encodeInt(int n, byte[] buf, int pos) { + // move sign to low-order bit, and flip others if negative + n = (n << 1) ^ (n >> 31); + int start = pos; + if ((n & ~0x7F) != 0) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + } + } + } + } + buf[pos++] = (byte) n; + return pos - start; + } + + /** Encode a long to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 10 bytes left in the buffer before calling this method. + * @return The number of bytes written to the buffer, between 1 and 10. + */ + public static int encodeLong(long n, byte[] buf, int pos) { + // move sign to low-order bit, and flip others if negative + n = (n << 1) ^ (n >> 63); + int start = pos; + if ((n & ~0x7FL) != 0) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + if (n > 0x7F) { + buf[pos++] = (byte)((n | 0x80) & 0xFF); + n >>>= 7; + } + } + } + } + } + } + } + } + } + buf[pos++] = (byte) n; + return pos - start; + } + + /** Encode a float to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 4 bytes left in the buffer before calling this method. + * @return Returns the number of bytes written to the buffer, 4. + */ + public static int encodeFloat(float f, byte[] buf, int pos) { + int len = 1; + int bits = Float.floatToRawIntBits(f); + // hotspot compiler works well with this variant + buf[pos] = (byte)((bits ) & 0xFF); + buf[pos + len++] = (byte)((bits >>> 8) & 0xFF); + buf[pos + len++] = (byte)((bits >>> 16) & 0xFF); + buf[pos + len++] = (byte)((bits >>> 24) & 0xFF); + return 4; + } + + /** Encode a double to the byte array at the given position. Will throw + * IndexOutOfBounds if it overflows. Users should ensure that there are at + * least 8 bytes left in the buffer before calling this method. + * @return Returns the number of bytes written to the buffer, 8. + */ + public static int encodeDouble(double d, byte[] buf, int pos) { + long bits = Double.doubleToRawLongBits(d); + int first = (int)(bits & 0xFFFFFFFF); + int second = (int)((bits >>> 32) & 0xFFFFFFFF); + // the compiler seems to execute this order the best, likely due to + // register allocation -- the lifetime of constants is minimized. + buf[pos] = (byte)((first ) & 0xFF); + buf[pos + 4] = (byte)((second ) & 0xFF); + buf[pos + 5] = (byte)((second >>> 8) & 0xFF); + buf[pos + 1] = (byte)((first >>> 8) & 0xFF); + buf[pos + 2] = (byte)((first >>> 16) & 0xFF); + buf[pos + 6] = (byte)((second >>> 16) & 0xFF); + buf[pos + 7] = (byte)((second >>> 24) & 0xFF); + buf[pos + 3] = (byte)((first >>> 24) & 0xFF); + return 8; + } +} diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryEncoder.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryEncoder.java new file mode 100644 index 000000000..35125bb3c --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BinaryEncoder.java @@ -0,0 +1,98 @@ +package org.apache.avro.io.avro18; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.avro.util.Utf8; + +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + */ +public abstract class Avro18BinaryEncoder extends Avro18Encoder { + + @Override + public void writeNull() throws IOException {} + + @Override + public void writeString(Utf8 utf8) throws IOException { + this.writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); + } + + @Override + public void writeString(String string) throws IOException { + if (0 == string.length()) { + writeZero(); + return; + } + byte[] bytes = string.getBytes("UTF-8"); + writeInt(bytes.length); + writeFixed(bytes, 0, bytes.length); + } + + @Override + public void writeBytes(ByteBuffer bytes) throws IOException { + int len = bytes.limit() - bytes.position(); + if (0 == len) { + writeZero(); + } else { + writeInt(len); + writeFixed(bytes); + } + } + + @Override + public void writeBytes(byte[] bytes, int start, int len) throws IOException { + if (0 == len) { + writeZero(); + return; + } + this.writeInt(len); + this.writeFixed(bytes, start, len); + } + + @Override + public void writeEnum(int e) throws IOException { + this.writeInt(e); + } + + @Override + public void writeArrayStart() throws IOException {} + + @Override + public void setItemCount(long itemCount) throws IOException { + if (itemCount > 0) { + this.writeLong(itemCount); + } + } + + @Override + public void startItem() throws IOException {} + + @Override + public void writeArrayEnd() throws IOException { + writeZero(); + } + + @Override + public void writeMapStart() throws IOException {} + + @Override + public void writeMapEnd() throws IOException { + writeZero(); + } + + @Override + public void writeIndex(int unionIndex) throws IOException { + writeInt(unionIndex); + } + + /** Write a zero byte to the underlying output. **/ + protected abstract void writeZero() throws IOException; + + /** + * Returns the number of bytes currently buffered by this encoder. If this + * Encoder does not buffer, this will always return zero. + * + * Call {@link #flush()} to empty the buffer to the underlying output. + */ + public abstract int bytesBuffered(); +} diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BufferedBinaryEncoder.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BufferedBinaryEncoder.java new file mode 100644 index 000000000..f7be8895b --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18BufferedBinaryEncoder.java @@ -0,0 +1,242 @@ +package org.apache.avro.io.avro18; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; + +import org.apache.avro.AvroRuntimeException; + + +/** + * An {@link Encoder} for Avro's binary encoding. + *

+ * This implementation buffers output to enhance performance. + * Output may not appear on the underlying output until flush() is called. + *

+ * {@literal DirectBinaryEncoder} can be used in place of this implementation if + * the buffering semantics are not desired, and the performance difference + * is acceptable. + *

+ * To construct or reconfigure, use + * {@literal EncoderFactory#binaryEncoder(OutputStream, BinaryEncoder)}. + *

+ * To change the buffer size, configure the factory instance used to + * create instances with {@literal EncoderFactory#configureBufferSize(int)} + * @see Encoder + */ +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + */ +public class Avro18BufferedBinaryEncoder extends Avro18BinaryEncoder { + /** + * Default buffer size defined in {@literal EncoderFactory} of Avro-1.8 + */ + public static final int DEFAULT_BUFFER_SIZE = 2048; + + private byte[] buf; + private int pos; + private ByteSink sink; + private int bulkLimit; + + public Avro18BufferedBinaryEncoder(OutputStream out) { + this(out, DEFAULT_BUFFER_SIZE); + } + + public Avro18BufferedBinaryEncoder(OutputStream out, int bufferSize) { + configure(out, bufferSize); + } + + Avro18BufferedBinaryEncoder configure(OutputStream out, int bufferSize) { + if (null == out) + throw new NullPointerException("OutputStream cannot be null!"); + if (null != this.sink) { + if ( pos > 0) { + try { + flushBuffer(); + } catch (IOException e) { + throw new AvroRuntimeException("Failure flushing old output", e); + } + } + } + this.sink = new OutputStreamSink(out); + pos = 0; + if (null == buf || buf.length != bufferSize) { + buf = new byte[bufferSize]; + } + bulkLimit = buf.length >>> 1; + if (bulkLimit > 512) { + bulkLimit = 512; + } + return this; + } + + public void init(OutputStream out) throws IOException { + flush(); + configure(out, DEFAULT_BUFFER_SIZE); + } + + @Override + public void flush() throws IOException { + flushBuffer(); + sink.innerFlush(); + } + + /** Flushes the internal buffer to the underlying output. + * Does not flush the underlying output. + */ + private void flushBuffer() throws IOException { + if (pos > 0) { + sink.innerWrite(buf, 0, pos); + pos = 0; + } + } + + /** Ensures that the buffer has at least num bytes free to write to between its + * current position and the end. This will not expand the buffer larger than + * its current size, for writes larger than or near to the size of the buffer, + * we flush the buffer and write directly to the output, bypassing the buffer. + * @param num + * @throws IOException + */ + private void ensureBounds(int num) throws IOException { + int remaining = buf.length - pos; + if (remaining < num) { + flushBuffer(); + } + } + + @Override + public void writeBoolean(boolean b) throws IOException { + // inlined, shorter version of ensureBounds + if (buf.length == pos) { + flushBuffer(); + } + pos += Avro18BinaryData.encodeBoolean(b, buf, pos); + } + + @Override + public void writeInt(int n) throws IOException { + ensureBounds(5); + pos += Avro18BinaryData.encodeInt(n, buf, pos); + } + + @Override + public void writeLong(long n) throws IOException { + ensureBounds(10); + pos += Avro18BinaryData.encodeLong(n, buf, pos); + } + + @Override + public void writeFloat(float f) throws IOException { + ensureBounds(4); + pos += Avro18BinaryData.encodeFloat(f, buf, pos); + } + + @Override + public void writeDouble(double d) throws IOException { + ensureBounds(8); + pos += Avro18BinaryData.encodeDouble(d, buf, pos); + } + + @Override + public void writeFixed(byte[] bytes, int start, int len) throws IOException { + if (len > bulkLimit) { + //too big, write direct + flushBuffer(); + sink.innerWrite(bytes, start, len); + return; + } + ensureBounds(len); + System.arraycopy(bytes, start, buf, pos, len); + pos += len; + } + + @Override + public void writeFixed(ByteBuffer bytes) throws IOException { + if (!bytes.hasArray() && bytes.remaining() > bulkLimit) { + flushBuffer(); + sink.innerWrite(bytes); // bypass the buffer + } else { + super.writeFixed(bytes); + } + } + + @Override + protected void writeZero() throws IOException { + writeByte(0); + } + + private void writeByte(int b) throws IOException { + if (pos == buf.length) { + flushBuffer(); + } + buf[pos++] = (byte) (b & 0xFF); + } + + @Override + public int bytesBuffered() { + return pos; + } + + /** + * ByteSink abstracts the destination of written data from the core workings + * of BinaryEncoder. + *

+ * Currently the only destination option is an OutputStream, but we may later + * want to handle other constructs or specialize for certain OutputStream + * Implementations such as ByteBufferOutputStream. + *

+ */ + private abstract static class ByteSink { + protected ByteSink() {} + /** Write data from bytes, starting at off, for len bytes **/ + protected abstract void innerWrite(byte[] bytes, int off, int len) throws IOException; + + protected abstract void innerWrite(ByteBuffer buff) throws IOException; + + /** Flush the underlying output, if supported **/ + protected abstract void innerFlush() throws IOException; + } + + static class OutputStreamSink extends ByteSink { + private final OutputStream out; + private final WritableByteChannel channel; + private OutputStreamSink(OutputStream out) { + super(); + this.out = out; + channel = Channels.newChannel(out); + } + @Override + protected void innerWrite(byte[] bytes, int off, int len) + throws IOException { + out.write(bytes, off, len); + } + @Override + protected void innerFlush() throws IOException { + out.flush(); + } + @Override + protected void innerWrite(ByteBuffer buff) throws IOException { + channel.write(buff); + } + } +} diff --git a/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18Encoder.java b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18Encoder.java new file mode 100644 index 000000000..42837e4a1 --- /dev/null +++ b/avro-migration-helper/src/main/java/org/apache/avro/io/avro18/Avro18Encoder.java @@ -0,0 +1,264 @@ +package org.apache.avro.io.avro18; + +import java.io.Flushable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.avro.AvroTypeException; +import org.apache.avro.io.Encoder; +import org.apache.avro.util.Utf8; + +/** + * Back-port {@literal BufferedBinaryEncoder} from Avro-1.8, so that Avro-1.4 could use it to improve serialization performance. + */ +public abstract class Avro18Encoder extends Encoder implements Flushable { + + /** + * "Writes" a null value. (Doesn't actually write anything, but + * advances the state of the parser if this class is stateful.) + * @throws AvroTypeException If this is a stateful writer and a + * null is not expected + */ + public abstract void writeNull() throws IOException; + + /** + * Write a boolean value. + * @throws AvroTypeException If this is a stateful writer and a + * boolean is not expected + */ + public abstract void writeBoolean(boolean b) throws IOException; + + /** + * Writes a 32-bit integer. + * @throws AvroTypeException If this is a stateful writer and an + * integer is not expected + */ + public abstract void writeInt(int n) throws IOException; + + /** + * Write a 64-bit integer. + * @throws AvroTypeException If this is a stateful writer and a + * long is not expected + */ + public abstract void writeLong(long n) throws IOException; + + /** Write a float. + * @throws IOException + * @throws AvroTypeException If this is a stateful writer and a + * float is not expected + */ + public abstract void writeFloat(float f) throws IOException; + + /** + * Write a double. + * @throws AvroTypeException If this is a stateful writer and a + * double is not expected + */ + public abstract void writeDouble(double d) throws IOException; + + /** + * Write a Unicode character string. + * @throws AvroTypeException If this is a stateful writer and a + * char-string is not expected + */ + public abstract void writeString(Utf8 utf8) throws IOException; + + /** + * Write a Unicode character string. The default implementation converts + * the String to a {@link org.apache.avro.util.Utf8}. Some Encoder + * implementations may want to do something different as a performance optimization. + * @throws AvroTypeException If this is a stateful writer and a + * char-string is not expected + */ + public void writeString(String str) throws IOException { + writeString(new Utf8(str)); + } + + /** + * Write a Unicode character string. If the CharSequence is an + * {@link org.apache.avro.util.Utf8} it writes this directly, otherwise + * the CharSequence is converted to a String via toString() and written. + * @throws AvroTypeException If this is a stateful writer and a + * char-string is not expected + */ + public void writeString(CharSequence charSequence) throws IOException { + if (charSequence instanceof Utf8) + writeString((Utf8)charSequence); + else + writeString(charSequence.toString()); + } + + /** + * Write a byte string. + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + */ + public abstract void writeBytes(ByteBuffer bytes) throws IOException; + + /** + * Write a byte string. + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + */ + public abstract void writeBytes(byte[] bytes, int start, int len) throws IOException; + + /** + * Writes a byte string. + * Equivalent to writeBytes(bytes, 0, bytes.length) + * @throws IOException + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + */ + public void writeBytes(byte[] bytes) throws IOException { + writeBytes(bytes, 0, bytes.length); + } + + /** + * Writes a fixed size binary object. + * @param bytes The contents to write + * @param start The position within bytes where the contents + * start. + * @param len The number of bytes to write. + * @throws AvroTypeException If this is a stateful writer and a + * byte-string is not expected + * @throws IOException + */ + public abstract void writeFixed(byte[] bytes, int start, int len) throws IOException; + + /** + * A shorthand for writeFixed(bytes, 0, bytes.length) + * @param bytes + */ + public void writeFixed(byte[] bytes) throws IOException { + writeFixed(bytes, 0, bytes.length); + } + + /** Writes a fixed from a ByteBuffer. */ + public void writeFixed(ByteBuffer bytes) throws IOException { + int pos = bytes.position(); + int len = bytes.limit() - pos; + if (bytes.hasArray()) { + writeFixed(bytes.array(), bytes.arrayOffset() + pos, len); + } else { + byte[] b = new byte[len]; + bytes.duplicate().get(b, 0, len); + writeFixed(b, 0, len); + } + } + + /** + * Writes an enumeration. + * @param e + * @throws AvroTypeException If this is a stateful writer and an enumeration + * is not expected or the e is out of range. + * @throws IOException + */ + public abstract void writeEnum(int e) throws IOException; + + /** Call this method to start writing an array. + * + * When starting to serialize an array, call {@link + * #writeArrayStart}. Then, before writing any data for any item + * call {@link #setItemCount} followed by a sequence of + * {@link #startItem()} and the item itself. The number of + * {@link #startItem()} should match the number specified in + * {@link #setItemCount}. + * When actually writing the data of the item, you can call any {@link + * Encoder} method (e.g., {@link #writeLong}). When all items + * of the array have been written, call {@link #writeArrayEnd}. + * + * As an example, let's say you want to write an array of records, + * the record consisting of an Long field and a Boolean field. + * Your code would look something like this: + *

+   *  out.writeArrayStart();
+   *  out.setItemCount(list.size());
+   *  for (Record r : list) {
+   *    out.startItem();
+   *    out.writeLong(r.longField);
+   *    out.writeBoolean(r.boolField);
+   *  }
+   *  out.writeArrayEnd();
+   *  
+ * @throws AvroTypeException If this is a stateful writer and an + * array is not expected + */ + public abstract void writeArrayStart() throws IOException; + + /** + * Call this method before writing a batch of items in an array or a map. + * Then for each item, call {@link #startItem()} followed by any of the + * other write methods of {@link Encoder}. The number of calls + * to {@link #startItem()} must be equal to the count specified + * in {@link #setItemCount}. Once a batch is completed you + * can start another batch with {@link #setItemCount}. + * + * @param itemCount The number of {@link #startItem()} calls to follow. + * @throws IOException + */ + public abstract void setItemCount(long itemCount) throws IOException; + + /** + * Start a new item of an array or map. + * See {@link #writeArrayStart} for usage information. + * @throws AvroTypeException If called outside of an array or map context + */ + public abstract void startItem() throws IOException; + + /** + * Call this method to finish writing an array. + * See {@link #writeArrayStart} for usage information. + * + * @throws AvroTypeException If items written does not match count + * provided to {@link #writeArrayStart} + * @throws AvroTypeException If not currently inside an array + */ + public abstract void writeArrayEnd() throws IOException; + + /** + * Call this to start a new map. See + * {@link #writeArrayStart} for details on usage. + * + * As an example of usage, let's say you want to write a map of + * records, the record consisting of an Long field and a Boolean + * field. Your code would look something like this: + *
+   * out.writeMapStart();
+   * out.setItemCount(list.size());
+   * for (Map.Entry<String,Record> entry : map.entrySet()) {
+   *   out.startItem();
+   *   out.writeString(entry.getKey());
+   *   out.writeLong(entry.getValue().longField);
+   *   out.writeBoolean(entry.getValue().boolField);
+   * }
+   * out.writeMapEnd();
+   * 
+ * @throws AvroTypeException If this is a stateful writer and a + * map is not expected + */ + public abstract void writeMapStart() throws IOException; + + /** + * Call this method to terminate the inner-most, currently-opened + * map. See {@link #writeArrayStart} for more details. + * + * @throws AvroTypeException If items written does not match count + * provided to {@link #writeMapStart} + * @throws AvroTypeException If not currently inside a map + */ + public abstract void writeMapEnd() throws IOException; + + /** Call this method to write the tag of a union. + * + * As an example of usage, let's say you want to write a union, + * whose second branch is a record consisting of an Long field and + * a Boolean field. Your code would look something like this: + *
+   * out.writeIndex(1);
+   * out.writeLong(record.longField);
+   * out.writeBoolean(record.boolField);
+   * 
+ * @throws AvroTypeException If this is a stateful writer and a + * map is not expected + */ + public abstract void writeIndex(int unionIndex) throws IOException; +}