diff --git a/README.md b/README.md index 276be3923..7115109d3 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ LinkedIn. | Version | Serialization | Deserialization | Fast Serialization | Fast Deserialization | | -------- | ------------- | --------------- | ------------------ | -------------------- | -| Avro 1.4 | Yes | Yes | No | Yes | +| Avro 1.4 | Yes | Yes | Yes | Yes | | Avro 1.5 | ??? | ??? | No | No | | Avro 1.6 | ??? | ??? | No | No | | Avro 1.7 | Yes | Yes | Yes | Yes | diff --git a/avro-fastserde/build.gradle b/avro-fastserde/build.gradle index 6f449ccc7..c86e8e8d7 100644 --- a/avro-fastserde/build.gradle +++ b/avro-fastserde/build.gradle @@ -74,10 +74,7 @@ for (String avroVersion : avroVersions) { task "testAvro${avroVersion}" (type: Test, group: "Verification", description: "runs unit tests with avro ${avroVersion}") { useTestNG() { - includeGroups "deserializationTest" - if (!avroVersion.equals("14")) { - includeGroups "serializationTest" - } + excludeGroups "perfTest" } testLogging { @@ -147,4 +144,4 @@ cleanupAndRebuildTestsForAvro18.dependsOn generateAvroClasses18 testAvro14.dependsOn cleanupAndRebuildTestsForAvro14 testAvro17.dependsOn cleanupAndRebuildTestsForAvro17 -testAvro18.dependsOn cleanupAndRebuildTestsForAvro18 \ No newline at end of file +testAvro18.dependsOn cleanupAndRebuildTestsForAvro18 diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java index 3709716c2..bbe7f7b32 100644 --- a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java @@ -6,6 +6,7 @@ import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; +import com.sun.codemodel.JFieldVar; import com.sun.codemodel.JForEach; import com.sun.codemodel.JForLoop; import com.sun.codemodel.JMethod; @@ -15,8 +16,11 @@ import java.io.File; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.avro.Schema; import org.apache.avro.io.Encoder; import org.apache.avro.util.Utf8; @@ -31,11 +35,21 @@ public class FastSerializerGenerator extends FastSerializerGeneratorBase { private final Map serializeMethodMap = new HashMap<>(); private final SchemaAssistant schemaAssistant; + /** + * Enum schema mapping for Avro-1.4. + */ + private JFieldVar enumSchemaMapField; + /** + * This field is used to decide whether the corresponding schema is already in {@link #enumSchemaMapField} or not. + */ + private final Set enumSchemaIdSet = new HashSet<>(); + + public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader, String compileClassPath) { super(schema, destination, classLoader, compileClassPath); this.useGenericTypes = useGenericTypes; - this.schemaAssistant = new SchemaAssistant(codeModel, useGenericTypes); + this.schemaAssistant = new SchemaAssistantForSerializer(codeModel, useGenericTypes); } @Override @@ -46,10 +60,24 @@ public FastSerializer generateSerializer() { try { serializerClass = classPackage._class(className); + if (Utils.isAvro14()) { + /** + * In Avro-1.4, there is no way to infer/extract enum schema from {@link org.apache.avro.generic.GenericData.EnumSymbol}, so + * the serializer needs to maintain a mapping between the schema id and the actual {@link org.apache.avro.Schema.EnumSchema}, + * and get the enum id from the corresponding EnumSchema in {@link #processEnum(Schema, JExpression, JBlock)}. + */ + enumSchemaMapField = + serializerClass.field( + JMod.PRIVATE, + codeModel.ref(Map.class).narrow(Long.class).narrow(Schema.class), + "enumSchemaMap", + JExpr._new(codeModel.ref(ConcurrentHashMap.class).narrow(Long.class).narrow(Schema.class))); + } + final JMethod serializeMethod = serializerClass.method(JMod.PUBLIC, void.class, "serialize"); final JVar serializeMethodParam; - JClass outputClass = classFromSchemaForSerializer(schema); + JClass outputClass = schemaAssistant.classFromSchema(schema); serializerClass._implements(codeModel.ref(FastSerializer.class).narrow(outputClass)); serializeMethodParam = serializeMethod.param(outputClass, "data"); @@ -126,7 +154,7 @@ private void processRecord(final Schema recordSchema, JExpression recordExpr, fi for (Schema.Field field : recordSchema.getFields()) { Schema fieldSchema = field.schema(); if (SchemaAssistant.isComplexType(fieldSchema)) { - JClass fieldClass = classFromSchemaForSerializer(fieldSchema); + JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema); JVar containerVar = declareValueVar(field.name(), fieldSchema, body); JExpression valueExpression = JExpr.invoke(recordExpr, "get").arg(JExpr.lit(field.pos())); containerVar.init(JExpr.cast(fieldClass, valueExpression)); @@ -138,37 +166,8 @@ private void processRecord(final Schema recordSchema, JExpression recordExpr, fi } } - /** - * Special handling for "String" type since the underlying data could be "String" or "Utf8". - * - * This is different from the de-serializer since Avro will always decode it into "Utf8". - * @param fieldClass - * @return - */ - private JClass specialHandlingOfStringSchemaForSerializer(JClass fieldClass) { - if (fieldClass.equals(codeModel.ref(Utf8.class))) { - return codeModel.ref(CharSequence.class); - } - return fieldClass; - } - - private JClass classFromSchemaForSerializer(Schema fieldSchema) { - JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema); - return specialHandlingOfStringSchemaForSerializer(fieldClass); - } - - private JClass classFromSchemaForSerializer(Schema fieldSchema, boolean abstractType) { - JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema, abstractType); - return specialHandlingOfStringSchemaForSerializer(fieldClass); - } - - private JClass classFromSchemaForSerializer(Schema fieldSchema, boolean abstractType, boolean rawType) { - JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema, abstractType, rawType); - return specialHandlingOfStringSchemaForSerializer(fieldClass); - } - private void processArray(final Schema arraySchema, JExpression arrayExpr, JBlock body) { - final JClass arrayClass = classFromSchemaForSerializer(arraySchema); + final JClass arrayClass = schemaAssistant.classFromSchema(arraySchema); body.invoke(JExpr.direct(ENCODER), "writeArrayStart"); final JExpression emptyArrayCondition = arrayExpr.eq(JExpr._null()).cor(JExpr.invoke(arrayExpr, "isEmpty")); @@ -199,9 +198,9 @@ private void processArray(final Schema arraySchema, JExpression arrayExpr, JBloc private void processMap(final Schema mapSchema, JExpression mapExpr, JBlock body) { - final JClass mapClass = classFromSchemaForSerializer(mapSchema); + final JClass mapClass = schemaAssistant.classFromSchema(mapSchema); - JClass keyClass = specialHandlingOfStringSchemaForSerializer(schemaAssistant.keyClassFromMapSchema(mapSchema)); + JClass keyClass = schemaAssistant.keyClassFromMapSchema(mapSchema); body.invoke(JExpr.direct(ENCODER), "writeMapStart"); @@ -280,8 +279,8 @@ private void processUnion(final Schema unionSchema, JExpression unionExpr, JBloc continue; } - JClass optionClass = classFromSchemaForSerializer(schemaOption); - JClass rawOptionClass = classFromSchemaForSerializer(schemaOption, true, true); + JClass optionClass = schemaAssistant.classFromSchema(schemaOption); + JClass rawOptionClass = schemaAssistant.classFromSchema(schemaOption, true, true); JExpression condition = unionExpr._instanceof(rawOptionClass); if (useGenericTypes && SchemaAssistant.isNamedType(schemaOption)) { condition = condition.cand(JExpr.invoke(JExpr.lit(schemaOption.getFullName()), "equals") @@ -304,18 +303,41 @@ private void processUnion(final Schema unionSchema, JExpression unionExpr, JBloc } private void processFixed(Schema fixedSchema, JExpression fixedValueExpression, JBlock body) { - JClass fixedClass = classFromSchemaForSerializer(fixedSchema); + JClass fixedClass = schemaAssistant.classFromSchema(fixedSchema); body.invoke(JExpr.direct(ENCODER), "writeFixed") .arg(JExpr.invoke(JExpr.cast(fixedClass, fixedValueExpression), "bytes")); } private void processEnum(Schema enumSchema, JExpression enumValueExpression, JBlock body) { - JClass enumClass = classFromSchemaForSerializer(enumSchema); + JClass enumClass = schemaAssistant.classFromSchema(enumSchema); JExpression enumValueCasted = JExpr.cast(enumClass, enumValueExpression); JExpression valueToWrite; if (useGenericTypes) { - valueToWrite = - JExpr.invoke(enumValueCasted.invoke("getSchema"), "getEnumOrdinal").arg(enumValueCasted.invoke("toString")); + if (Utils.isAvro14()) { + /** + * Register/retrieve the corresponding {@link org.apache.avro.Schema.EnumSchema} from the mapping. + */ + long enumSchemaFingerprint = Utils.getSchemaFingerprint(enumSchema); + if (enumSchemaIdSet.contains(enumSchemaFingerprint)) { + valueToWrite = JExpr.invoke( + enumSchemaMapField.invoke("get").arg(JExpr.lit(enumSchemaFingerprint)), + "getEnumOrdinal" + ).arg(enumValueCasted.invoke("toString")); + } else { + enumSchemaIdSet.add(enumSchemaFingerprint); + JVar enumSchemaVar = body.decl(codeModel.ref(Schema.class), + getVariableName(enumSchema.getName() + "EnumSchema"), + codeModel.ref(Schema.class).staticInvoke("parse").arg(enumSchema.toString()) + ); + body.invoke(enumSchemaMapField, "put").arg(JExpr.lit(enumSchemaFingerprint)).arg(enumSchemaVar); + valueToWrite = JExpr.invoke(enumSchemaVar, "getEnumOrdinal").arg(enumValueCasted.invoke("toString")); + } + } else { + valueToWrite = JExpr.invoke( + enumValueCasted.invoke("getSchema"), + "getEnumOrdinal" + ).arg(enumValueCasted.invoke("toString")); + } } else { valueToWrite = enumValueCasted.invoke("ordinal"); } @@ -339,7 +361,7 @@ private void processString(final Schema primitiveSchema, JExpression primitiveVa private void processPrimitive(final Schema primitiveSchema, JExpression primitiveValueExpression, JBlock body) { String writeFunction; - JClass primitiveClass = classFromSchemaForSerializer(primitiveSchema); + JClass primitiveClass = schemaAssistant.classFromSchema(primitiveSchema); JExpression castedValue = JExpr.cast(primitiveClass, primitiveValueExpression); switch (primitiveSchema.getType()) { case STRING: @@ -373,7 +395,7 @@ private void processPrimitive(final Schema primitiveSchema, JExpression primitiv private JVar declareValueVar(final String name, final Schema schema, JBlock block) { if (SchemaAssistant.isComplexType(schema)) { - return block.decl(classFromSchemaForSerializer(schema, true), getVariableName(StringUtils.uncapitalize(name)), + return block.decl(schemaAssistant.classFromSchema(schema, true), getVariableName(StringUtils.uncapitalize(name)), JExpr._null()); } else { throw new FastDeserializerGeneratorException("Incorrect container variable: " + schema.getType()); //.getName()); @@ -400,7 +422,7 @@ private JMethod createMethod(final Schema schema) { JMethod method = serializerClass.method(JMod.PUBLIC, codeModel.VOID, "serialize" + schema.getName() + nextUniqueInt()); method._throws(IOException.class); - method.param(classFromSchemaForSerializer(schema), "data"); + method.param(schemaAssistant.classFromSchema(schema), "data"); method.param(Encoder.class, ENCODER); method.annotate(SuppressWarnings.class).param("value", "unchecked"); diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java index 109b2343f..be9885769 100644 --- a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistant.java @@ -47,6 +47,10 @@ public SchemaAssistant(JCodeModel codeModel, boolean useGenericTypes) { this.exceptionsFromStringable = new HashSet<>(); } + protected JCodeModel getCodeModel() { + return codeModel; + } + protected Set getUsedFullyQualifiedClassNameSet() { return fullyQualifiedClassNameSet; } @@ -67,9 +71,13 @@ public static boolean isComplexType(Schema schema) { public static boolean isNamedType(Schema schema) { switch (schema.getType()) { case RECORD: + return true; case ENUM: case FIXED: - return true; + /** + * This is used to avoid `getSchema` call since in Avro-1.4, `getSchema` method is not available for Enum and Fixed. + */ + return Utils.isAvro14() ? false : true; default: return false; } @@ -129,7 +137,7 @@ public JClass keyClassFromMapSchema(Schema schema) { extendExceptionsFromStringable(schema.getProp(KEY_CLASS_PROP)); return codeModel.ref(schema.getProp(KEY_CLASS_PROP)); } else { - return codeModel.ref(Utf8.class); + return defaultStringType(); } } @@ -249,7 +257,7 @@ public JClass classFromSchema(Schema schema, boolean abstractType, boolean rawTy outputClass = codeModel.ref(schema.getProp(CLASS_PROP)); extendExceptionsFromStringable(schema.getProp(CLASS_PROP)); } else { - outputClass = codeModel.ref(Utf8.class); + outputClass = defaultStringType(); } break; case BYTES: @@ -270,6 +278,10 @@ public JClass classFromSchema(Schema schema, boolean abstractType, boolean rawTy return outputClass; } + protected JClass defaultStringType() { + return codeModel.ref(Utf8.class); + } + public JExpression getEnumValueByName(Schema enumSchema, JExpression nameExpr, JInvocation getSchemaExpr) { if (useGenericTypes) { if (Utils.isAvro14()) { @@ -309,7 +321,7 @@ public JExpression getStringableValue(Schema schema, JExpression stringExpr) { if (isStringable(schema)) { return JExpr._new(classFromSchema(schema)).arg(stringExpr); } else { - return JExpr._new(codeModel.ref(Utf8.class)).arg(stringExpr); + return JExpr._new(defaultStringType()).arg(stringExpr); } } } diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistantForSerializer.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistantForSerializer.java new file mode 100644 index 000000000..cf3da1c83 --- /dev/null +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/SchemaAssistantForSerializer.java @@ -0,0 +1,22 @@ +package com.linkedin.avro.fastserde; + +import com.sun.codemodel.JClass; +import com.sun.codemodel.JCodeModel; + + +public class SchemaAssistantForSerializer extends SchemaAssistant { + public SchemaAssistantForSerializer(JCodeModel codeModel, boolean useGenericTypes) { + super(codeModel, useGenericTypes); + } + + /** + * Special handling for "String" type since the underlying data could be "String" or "Utf8". + * + * This is different from the de-serializer since Avro will always decode it into "Utf8". + * @return + */ + @Override + protected JClass defaultStringType() { + return getCodeModel().ref(CharSequence.class); + } +} diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java index f1baf3625..729ac8ed6 100644 --- a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java @@ -80,6 +80,7 @@ public static Long getSchemaFingerprint(Schema schema) { AVRO_VERSIONS_SUPPORTED_FOR_DESERIALIZER.add(AvroVersion.AVRO_1_7); AVRO_VERSIONS_SUPPORTED_FOR_DESERIALIZER.add(AvroVersion.AVRO_1_8); + AVRO_VERSIONS_SUPPORTED_FOR_SERIALIZER.add(AvroVersion.AVRO_1_4); AVRO_VERSIONS_SUPPORTED_FOR_SERIALIZER.add(AvroVersion.AVRO_1_7); AVRO_VERSIONS_SUPPORTED_FOR_SERIALIZER.add(AvroVersion.AVRO_1_8); } diff --git a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java index 78301d850..f56e8597d 100644 --- a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java +++ b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java @@ -108,16 +108,31 @@ public void serializeStringableFields() throws URISyntaxException, MalformedURLE specificDataFromDecoder(StringableRecord.SCHEMA$, writeWithFastAvro(record, StringableRecord.SCHEMA$)); // then - Assert.assertEquals(exampleBigDecimal, afterDecoding.bigdecimal); - Assert.assertEquals(exampleBigInteger, afterDecoding.biginteger); - Assert.assertEquals(exampleFile, afterDecoding.file); - Assert.assertEquals(Collections.singletonList(exampleURL), afterDecoding.urlArray); - Assert.assertEquals(Collections.singletonMap(exampleURL, exampleBigInteger), afterDecoding.urlMap); - Assert.assertNotNull(afterDecoding.subRecord); - Assert.assertEquals(exampleURI, afterDecoding.subRecord.uriField); - Assert.assertNotNull(afterDecoding.subRecordWithSubRecord); - Assert.assertNotNull(afterDecoding.subRecordWithSubRecord.subRecord); - Assert.assertEquals(exampleURI, afterDecoding.subRecordWithSubRecord.subRecord.uriField); + if (Utils.isAvro14()) { + Assert.assertEquals(exampleBigDecimal.toString(), afterDecoding.bigdecimal.toString()); + Assert.assertEquals(exampleBigInteger.toString(), afterDecoding.biginteger.toString()); + Assert.assertEquals(exampleFile.toString(), afterDecoding.file.toString()); + Assert.assertEquals(Collections.singletonList(new Utf8(exampleURL.toString())), afterDecoding.urlArray); + Assert.assertEquals( + Collections.singletonMap(new Utf8(exampleURL.toString()), new Utf8(exampleBigInteger.toString())), + afterDecoding.urlMap); + Assert.assertNotNull(afterDecoding.subRecord); + Assert.assertEquals(exampleURI.toString(), afterDecoding.subRecord.uriField.toString()); + Assert.assertNotNull(afterDecoding.subRecordWithSubRecord); + Assert.assertNotNull(afterDecoding.subRecordWithSubRecord.subRecord); + Assert.assertEquals(exampleURI.toString(), afterDecoding.subRecordWithSubRecord.subRecord.uriField.toString()); + } else { + Assert.assertEquals(exampleBigDecimal, afterDecoding.bigdecimal); + Assert.assertEquals(exampleBigInteger, afterDecoding.biginteger); + Assert.assertEquals(exampleFile, afterDecoding.file); + Assert.assertEquals(Collections.singletonList(exampleURL), afterDecoding.urlArray); + Assert.assertEquals(Collections.singletonMap(exampleURL, exampleBigInteger), afterDecoding.urlMap); + Assert.assertNotNull(afterDecoding.subRecord); + Assert.assertEquals(exampleURI, afterDecoding.subRecord.uriField); + Assert.assertNotNull(afterDecoding.subRecordWithSubRecord); + Assert.assertNotNull(afterDecoding.subRecordWithSubRecord.subRecord); + Assert.assertEquals(exampleURI, afterDecoding.subRecordWithSubRecord.subRecord.uriField); + } } } diff --git a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/SerDeMicroBenchmark.java b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/SerDeMicroBenchmark.java index bb41ca476..c1ae5c025 100644 --- a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/SerDeMicroBenchmark.java +++ b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/micro/benchmark/SerDeMicroBenchmark.java @@ -59,7 +59,7 @@ public static byte[] serializedTestObjects(int recordCnt) throws Exception { return serializer.serializeObjects(objs); } - @Test(invocationCount = 5) + @Test(invocationCount = 5, groups = {"perfTest"}) public void testAvroSerialization() throws Exception { long startInMs = System.currentTimeMillis(); @@ -75,7 +75,7 @@ public void testAvroSerialization() throws Exception { System.out.println("Regular avro serialization latency: " + (System.currentTimeMillis() - startInMs) + " ms"); } - @Test(invocationCount = 5) + @Test(invocationCount = 5, groups = {"perfTest"}) public void testFastAvroSerialization() throws Exception { long startInMs = System.currentTimeMillis(); @@ -92,7 +92,7 @@ public void testFastAvroSerialization() throws Exception { System.out.println("Fast avro serialization latency: " + (System.currentTimeMillis() - startInMs) + " ms"); } - @Test(invocationCount = 5) + @Test(invocationCount = 5, groups = {"perfTest"}) public void testAvroDeserialization() throws Exception { byte[] serializedBytes = serializedTestObjects(1000); long startInMs = System.currentTimeMillis(); @@ -104,7 +104,7 @@ public void testAvroDeserialization() throws Exception { System.out.println("Regular avro deserialization latency: " + (System.currentTimeMillis() - startInMs) + " ms"); } - @Test(invocationCount = 5) + @Test(invocationCount = 5, groups = {"perfTest"}) public void testFastAvroDeserialization() throws Exception { byte[] serializedBytes = serializedTestObjects(1000); long startInMs = System.currentTimeMillis(); @@ -126,7 +126,7 @@ public Object[][] useFastAvroOptions() { return new Object[][]{{false}}; } - @Test(dataProvider = "useFastAvroOptionsProvider") + @Test(dataProvider = "useFastAvroOptionsProvider", groups = {"perfTest"}) public void testFastAvroWithMultiThread(boolean useFastAvro) throws Exception { byte[] serializedBytes = serializedTestObjects(1000); long startInMs = System.currentTimeMillis();