Skip to content

Commit

Permalink
Fixed a bug in processUnion of FastSerializer (#44)
Browse files Browse the repository at this point in the history
Previously, the code is checking schema type to decide the right index,
and this change was introduced to accommodate Avro-1.4, but this is
not appropriate since Union could contain multiple record types, which
will cause the writen index will always be the index of the first record.
The fix is to check the full name of the schema instead of type name.
  • Loading branch information
gaojieliu authored Apr 13, 2020
1 parent bd59deb commit e49d36e
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,10 @@ private Integer getIndexNamedForUnion(Schema unionSchema, Schema schema) {
throw new RuntimeException("Union schema expected, but received: " + unionSchema.getType());
}
List<Schema> subSchemas = unionSchema.getTypes();
String schemaFullName = SchemaAssistant.getSchemaFullName(schema);
int index = 0;
for (Schema subSchema : subSchemas) {
if (subSchema.getType().equals(schema.getType())) {
if (SchemaAssistant.getSchemaFullName(subSchema).equals(schemaFullName)) {
return index;
}
index++;
Expand All @@ -282,6 +283,13 @@ private void processUnion(final Schema unionSchema, JExpression unionExpr, JBloc
JClass optionClass = schemaAssistant.classFromSchema(schemaOption);
JClass rawOptionClass = schemaAssistant.classFromSchema(schemaOption, true, true);
JExpression condition = unionExpr._instanceof(rawOptionClass);
/**
* In Avro-1.4, neither GenericEnumSymbol or GenericFixed has associated schema, so we don't expect to see
* two or more Enum types or two or more Fixed types in the same Union in generic mode since the writer couldn't
* differentiate the Enum types or the Fixed types, but those scenarios are well supported in Avro-1.7 or above since
* both of them have associated 'Schema', so the serializer could recognize the right type
* by checking the associated 'Schema' in generic mode.
*/
if (useGenericTypes && SchemaAssistant.isNamedType(schemaOption)) {
condition = condition.cand(JExpr.invoke(JExpr.lit(schemaOption.getFullName()), "equals")
.arg(JExpr.invoke(JExpr.cast(optionClass, unionExpr), "getSchema").invoke("getFullName")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ public static boolean isComplexType(Schema schema) {
}
}

public static String getSchemaFullName(Schema schema) {
Schema.Type type = schema.getType();
boolean isNamedType = type.equals(Schema.Type.ENUM) || type.equals(Schema.Type.FIXED) || type.equals(Schema.Type.RECORD);
/**
* Avro-1.4 doesn't support {@link Schema#getFullName()} if the Schema is not a NamedSchema.
*/
return isNamedType ? schema.getFullName() : type.name();
}

public static boolean isNamedType(Schema schema) {
switch (schema.getType()) {
case RECORD:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,27 @@ public void shouldWriteSubRecordField() {
Assert.assertEquals("abc", record.get("field").toString());
}

@Test(groups = {"serializationTest"})
public void shouldWriteRightUnionIndex() {
// Create two record schemas
Schema recordSchema1 = createRecord("record1", createField("record1_field1", Schema.create(Schema.Type.STRING)));
Schema recordSchema2 = createRecord("record2", createField("record2_field1", Schema.create(Schema.Type.STRING)));
Schema unionSchema = createUnionSchema(recordSchema1, recordSchema2);
Schema recordWrapperSchema = createRecord("recordWrapper", createField("union_field", unionSchema));

GenericData.Record objectOfRecordSchema2 = new GenericData.Record(recordSchema2);
objectOfRecordSchema2.put("record2_field1", "abc");
GenericData.Record wrapperObject = new GenericData.Record(recordWrapperSchema);
wrapperObject.put("union_field", objectOfRecordSchema2);

GenericRecord record = decodeRecord(recordWrapperSchema, dataAsBinaryDecoder(wrapperObject));

Object unionField = record.get("union_field");
Assert.assertTrue(unionField instanceof GenericData.Record);
GenericData.Record unionRecord = (GenericData.Record)unionField;
Assert.assertEquals(unionRecord.getSchema().getName(), "record2");
}

@Test(groups = {"serializationTest"})
public void shouldWriteSubRecordCollectionsField() {
// given
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.avro.fastserde;

import java.util.Collections;
import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.Test;


public class SchemaAssistantTest {

@Test
public void testGetSchemaFullName() {
String namespace = "com.linkedin.avro.fastserde";
Assert.assertEquals("STRING", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.STRING)));
Assert.assertEquals("BYTES", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.BYTES)));
Assert.assertEquals("INT", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.INT)));
Assert.assertEquals("LONG", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.LONG)));
Assert.assertEquals("FLOAT", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.FLOAT)));
Assert.assertEquals("DOUBLE", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.DOUBLE)));
Assert.assertEquals("BOOLEAN", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.BOOLEAN)));
Assert.assertEquals("NULL", SchemaAssistant.getSchemaFullName(Schema.create(Schema.Type.NULL)));
Assert.assertEquals("com.linkedin.avro.fastserde.TestRecord", SchemaAssistant.getSchemaFullName(Schema.createRecord("TestRecord", "", namespace, false)));
Assert.assertEquals("com.linkedin.avro.fastserde.TestFixed", SchemaAssistant.getSchemaFullName(Schema.createFixed("TestFixed", "", namespace, 16)));
Assert.assertEquals("com.linkedin.avro.fastserde.TestEnum", SchemaAssistant.getSchemaFullName(Schema.createEnum("TestEnum", "", namespace, Collections
.emptyList())));
}
}

0 comments on commit e49d36e

Please sign in to comment.