Skip to content

Commit

Permalink
[fast-avro] Added logic to split large method for fast serializer (#514)
Browse files Browse the repository at this point in the history
In the past, we updated fast deserializer to support large method
splitting, and this code change is leveraging the similar logic to
split large method in fast serializer.
  • Loading branch information
gaojieliu authored Sep 8, 2023
1 parent c16ffa1 commit b5f3ce6
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void prepare() throws Exception {
classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()},
FastGenericDeserializerGeneratorTest.class.getClassLoader());

// In order to test the functionallity of the record split we set an unusually low number
// In order to test the functionality of the record split we set an unusually low number
FastGenericDeserializerGenerator.setFieldsPerPopulationMethod(2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public void prepare() throws Exception {

classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()},
FastGenericSerializerGeneratorTest.class.getClassLoader());

// In order to test the functionality of the record split we set an unusually low number
FastGenericSerializerGenerator.setFieldsPerRecordSerializationMethod(2);
}

@Test(groups = {"serializationTest"})
Expand Down Expand Up @@ -529,6 +532,26 @@ public long getPrimitive(int index) {
Assert.assertTrue(primitiveApiCalled.get());
}

@Test(groups = {"serializationTest"})
public void shouldSerializeLargeRecord() {
int fieldCnt = 1000;
String fieldNamePrefix = "field_";
Schema.Field[] fields = new Schema.Field[fieldCnt];
for (int i = 0; i < fieldCnt; ++i) {
fields[i] = createPrimitiveFieldSchema(fieldNamePrefix + i, Schema.Type.INT);
}
Schema recordSchema = createRecord(fields);
GenericRecord record = new GenericData.Record(recordSchema);
for (int i = 0; i < fieldCnt; ++i) {
record.put(fieldNamePrefix + i, new Integer(i));
}

GenericRecord decodedRecord = decodeRecord(recordSchema, dataAsBinaryDecoder(record, recordSchema));
for (int i = 0; i < fieldCnt; ++i) {
Assert.assertEquals(decodedRecord.get(fieldNamePrefix + i), new Integer(i));
}
}

private <E> void shouldWriteArrayOfPrimitives(Schema.Type elementType, List<E> data) {
// given
Schema elementSchema = Schema.create(elementType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public void prepare() throws Exception {

classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()},
FastSpecificSerializerGeneratorTest.class.getClassLoader());

// In order to test the functionality of the record split we set an unusually low number
FastGenericSerializerGenerator.setFieldsPerRecordSerializationMethod(2);
}

@Test(groups = {"serializationTest"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public void prepare() throws Exception {
tempDir = getCodeGenDirectory();
classLoader = URLClassLoader.newInstance(new URL[]{tempDir.toURI().toURL()},
FastDeserializerDefaultsTest.class.getClassLoader());

// In order to test the functionality of the record split we set an unusually low number
FastGenericSerializerGenerator.setFieldsPerRecordSerializationMethod(2);
}

StringableRecord generateRecord(URL exampleURL, URI exampleURI, File exampleFile, BigDecimal exampleBigDecimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class FastDeserializerGenerator<T> extends FastDeserializerGeneratorBase<
private static final Logger LOGGER = LoggerFactory.getLogger(FastDeserializerGenerator.class);
private static final String DECODER = "decoder";
private static final String VAR_NAME_FOR_REUSE = "reuse";
private static int FIELDS_PER_POPULATION_METHOD = 100;
private static int FIELDS_PER_POPULATION_METHOD = 20;

// 65535 is the actual limit, 65K added for safety
static int MAX_LENGTH_OF_STRING_LITERAL = 65000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public class FastSerializerGenerator<T> extends FastSerdeBase {

private static int FIELDS_PER_RECORD_SERIALIZATION_METHOD = 20;

private static final String ENCODER = "encoder";
protected final Schema schema;

Expand All @@ -36,6 +38,10 @@ public class FastSerializerGenerator<T> extends FastSerdeBase {
*/
private final Map<Integer, JVar> enumSchemaVarMap = new HashMap<>();

static void setFieldsPerRecordSerializationMethod(int fieldCount) {
FIELDS_PER_RECORD_SERIALIZATION_METHOD = fieldCount;
}


public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader,
String compileClassPath) {
Expand Down Expand Up @@ -134,20 +140,39 @@ private void processRecord(final Schema recordSchema, JExpression recordExpr, fi
JMethod method = createMethod(recordSchema);
containerBody.invoke(getMethod(recordSchema)).arg(recordExpr).arg(JExpr.direct(ENCODER));

JBlock body = method.body();
JBlock methodBody = method.body();
recordExpr = method.listParams()[0];

int fieldCount = 0;
JBlock popMethodBody = methodBody;

for (Schema.Field field : recordSchema.getFields()) {
/**
* We roll the serialization method for very large records, the initial fields are kept in the outer
* method as original to maintain performance for smaller records
*/
fieldCount++;
if (fieldCount % FIELDS_PER_RECORD_SERIALIZATION_METHOD == 0) {
JMethod popMethod = generatedClass.method(JMod.PRIVATE, codeModel.VOID, getUniqueName("serialize_" + StringUtils.capitalize(recordSchema.getName())));
popMethod._throws(IOException.class);
popMethod.param(schemaAssistant.classFromSchema(recordSchema), "data");
popMethod.param(Encoder.class, ENCODER);
popMethod.annotate(SuppressWarnings.class).param("value", "unchecked");

popMethodBody = popMethod.body();
methodBody.invoke(popMethod).arg(recordExpr).arg(JExpr.direct(ENCODER));
}

Schema fieldSchema = field.schema();
if (SchemaAssistant.isComplexType(fieldSchema)) {
JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema);
JVar containerVar = declareValueVar(field.name(), fieldSchema, body);
JVar containerVar = declareValueVar(field.name(), fieldSchema, popMethodBody);
JExpression valueExpression = JExpr.invoke(recordExpr, "get").arg(JExpr.lit(field.pos()));
containerVar.init(JExpr.cast(fieldClass, valueExpression));

processComplexType(fieldSchema, containerVar, body);
processComplexType(fieldSchema, containerVar, popMethodBody);
} else {
processSimpleType(fieldSchema, recordExpr.invoke("get").arg(JExpr.lit(field.pos())), body);
processSimpleType(fieldSchema, recordExpr.invoke("get").arg(JExpr.lit(field.pos())), popMethodBody);
}
}
}
Expand Down

0 comments on commit b5f3ce6

Please sign in to comment.