diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java index e250998bd..65a96149d 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericDeserializerGeneratorTest.java @@ -90,7 +90,7 @@ public void prepare() throws Exception { classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()}, FastGenericDeserializerGeneratorTest.class.getClassLoader()); - // In order to test the functionallity of the record split we set an unusually low number + // In order to test the functionality of the record split we set an unusually low number FastGenericDeserializerGenerator.setFieldsPerPopulationMethod(2); } diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericSerializerGeneratorTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericSerializerGeneratorTest.java index 76513e44a..9c766cc07 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericSerializerGeneratorTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastGenericSerializerGeneratorTest.java @@ -44,6 +44,9 @@ public void prepare() throws Exception { classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()}, FastGenericSerializerGeneratorTest.class.getClassLoader()); + + // In order to test the functionality of the record split we set an unusually low number + FastGenericSerializerGenerator.setFieldsPerRecordSerializationMethod(2); } @Test(groups = {"serializationTest"}) @@ -529,6 +532,26 @@ public long getPrimitive(int index) { Assert.assertTrue(primitiveApiCalled.get()); } + @Test(groups = {"serializationTest"}) + public void shouldSerializeLargeRecord() { + int fieldCnt = 1000; + String fieldNamePrefix = "field_"; + Schema.Field[] fields = new Schema.Field[fieldCnt]; + for (int i = 0; i < fieldCnt; ++i) { + fields[i] = createPrimitiveFieldSchema(fieldNamePrefix + i, Schema.Type.INT); + } + Schema recordSchema = createRecord(fields); + GenericRecord record = new GenericData.Record(recordSchema); + for (int i = 0; i < fieldCnt; ++i) { + record.put(fieldNamePrefix + i, new Integer(i)); + } + + GenericRecord decodedRecord = decodeRecord(recordSchema, dataAsBinaryDecoder(record, recordSchema)); + for (int i = 0; i < fieldCnt; ++i) { + Assert.assertEquals(decodedRecord.get(fieldNamePrefix + i), new Integer(i)); + } + } + private void shouldWriteArrayOfPrimitives(Schema.Type elementType, List data) { // given Schema elementSchema = Schema.create(elementType); diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSpecificSerializerGeneratorTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSpecificSerializerGeneratorTest.java index c3fb06a6d..81257dde1 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSpecificSerializerGeneratorTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastSpecificSerializerGeneratorTest.java @@ -75,6 +75,9 @@ public void prepare() throws Exception { classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()}, FastSpecificSerializerGeneratorTest.class.getClassLoader()); + + // In order to test the functionality of the record split we set an unusually low number + FastGenericSerializerGenerator.setFieldsPerRecordSerializationMethod(2); } @Test(groups = {"serializationTest"}) diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java index 53afab35f..187311394 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java @@ -59,6 +59,9 @@ public void prepare() throws Exception { tempDir = getCodeGenDirectory(); classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()}, FastDeserializerDefaultsTest.class.getClassLoader()); + + // In order to test the functionality of the record split we set an unusually low number + FastGenericSerializerGenerator.setFieldsPerRecordSerializationMethod(2); } StringableRecord generateRecord(URL exampleURL, URI exampleURI, File exampleFile, BigDecimal exampleBigDecimal, diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java index 7463dd869..031a571d1 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java @@ -57,7 +57,7 @@ public class FastDeserializerGenerator extends FastDeserializerGeneratorBase< private static final Logger LOGGER = LoggerFactory.getLogger(FastDeserializerGenerator.class); private static final String DECODER = "decoder"; private static final String VAR_NAME_FOR_REUSE = "reuse"; - private static int FIELDS_PER_POPULATION_METHOD = 100; + private static int FIELDS_PER_POPULATION_METHOD = 20; // 65535 is the actual limit, 65K added for safety static int MAX_LENGTH_OF_STRING_LITERAL = 65000; diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java index fe4c97534..2aeb02315 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java @@ -26,6 +26,8 @@ public class FastSerializerGenerator extends FastSerdeBase { + private static int FIELDS_PER_RECORD_SERIALIZATION_METHOD = 20; + private static final String ENCODER = "encoder"; protected final Schema schema; @@ -36,6 +38,10 @@ public class FastSerializerGenerator extends FastSerdeBase { */ private final Map enumSchemaVarMap = new HashMap<>(); + static void setFieldsPerRecordSerializationMethod(int fieldCount) { + FIELDS_PER_RECORD_SERIALIZATION_METHOD = fieldCount; + } + public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader, String compileClassPath) { @@ -134,20 +140,39 @@ private void processRecord(final Schema recordSchema, JExpression recordExpr, fi JMethod method = createMethod(recordSchema); containerBody.invoke(getMethod(recordSchema)).arg(recordExpr).arg(JExpr.direct(ENCODER)); - JBlock body = method.body(); + JBlock methodBody = method.body(); recordExpr = method.listParams()[0]; + int fieldCount = 0; + JBlock popMethodBody = methodBody; + for (Schema.Field field : recordSchema.getFields()) { + /** + * We roll the serialization method for very large records, the initial fields are kept in the outer + * method as original to maintain performance for smaller records + */ + fieldCount++; + if (fieldCount % FIELDS_PER_RECORD_SERIALIZATION_METHOD == 0) { + JMethod popMethod = generatedClass.method(JMod.PRIVATE, codeModel.VOID, getUniqueName("serialize_" + StringUtils.capitalize(recordSchema.getName()))); + popMethod._throws(IOException.class); + popMethod.param(schemaAssistant.classFromSchema(recordSchema), "data"); + popMethod.param(Encoder.class, ENCODER); + popMethod.annotate(SuppressWarnings.class).param("value", "unchecked"); + + popMethodBody = popMethod.body(); + methodBody.invoke(popMethod).arg(recordExpr).arg(JExpr.direct(ENCODER)); + } + Schema fieldSchema = field.schema(); if (SchemaAssistant.isComplexType(fieldSchema)) { JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema); - JVar containerVar = declareValueVar(field.name(), fieldSchema, body); + JVar containerVar = declareValueVar(field.name(), fieldSchema, popMethodBody); JExpression valueExpression = JExpr.invoke(recordExpr, "get").arg(JExpr.lit(field.pos())); containerVar.init(JExpr.cast(fieldClass, valueExpression)); - processComplexType(fieldSchema, containerVar, body); + processComplexType(fieldSchema, containerVar, popMethodBody); } else { - processSimpleType(fieldSchema, recordExpr.invoke("get").arg(JExpr.lit(field.pos())), body); + processSimpleType(fieldSchema, recordExpr.invoke("get").arg(JExpr.lit(field.pos())), popMethodBody); } } }