diff --git a/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/AvroRecordUtil.java b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/AvroRecordUtil.java index a0d961175..8f23f203b 100644 --- a/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/AvroRecordUtil.java +++ b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/AvroRecordUtil.java @@ -12,14 +12,23 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import com.linkedin.avroutil1.compatibility.avropath.ArrayPositionPredicate; +import com.linkedin.avroutil1.compatibility.avropath.AvroPath; +import com.linkedin.avroutil1.compatibility.avropath.LocationStep; +import com.linkedin.avroutil1.compatibility.avropath.MapKeyPredicate; +import com.linkedin.avroutil1.compatibility.avropath.UnionTypePredicate; +import com.linkedin.avroutil1.compatibility.exception.InconsistentSchemaException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericEnumSymbol; @@ -811,4 +820,150 @@ private static Enum getSpecificEnumSymbol(Class> enumClass, throw new IllegalStateException("while trying to resolve " + enumClass.getName() + ".valueOf(" + symbolStr + ")", e); } } + + /** + * verifies that the schemas on any nested named types "contained" by a given IndexedRecord + * match the schemas declared by the records schema (or SCHEMA$ field for specific record classes). + * this is useful to validate in situations where various generated record classes can come from + * different jars (and so could have a schema different than the one contained in the outermost + * record's schema) + * @param record a record, which could contain other records or named types + * @throws com.linkedin.avroutil1.compatibility.exception.InconsistentSchemaException if the schema for the record does not match the schema declared by any nested object + */ + public static void validateNestedSchemasConsistent(IndexedRecord record) throws Exception { + if (record == null) { + return; + } + Schema outermost = record.getSchema(); + Map referenceDefinitions = AvroSchemaUtil.getAllDefinedSchemas(outermost); + AvroPath path = new AvroPath(); + //we are trying to detect cases of multiple different definitions of the "same" schema. + //this means we cant cache results by schema fullname, but only by exact instance + //of org.apache.avro.Schema, hence the IndentityHashMap + IdentityHashMap visited = new IdentityHashMap<>(); + SchemaComparisonConfiguration comparisonConfiguration = SchemaComparisonConfiguration.STRICT; + //noinspection deprecation + if (AvroCompatibilityHelper.getRuntimeAvroVersion().earlierThan(AvroVersion.AVRO_1_8)) { + //avro < 1.7.3 cant handle non-string properties, so we use a less strict comparison + comparisonConfiguration = SchemaComparisonConfiguration.PRE_1_7_3; + } + validateNestedRecordSchema(record, path, visited, referenceDefinitions, comparisonConfiguration); + } + + private static void validateNestedRecordSchema( + IndexedRecord record, + AvroPath path, + IdentityHashMap visited, + Map referenceDefinitions, + SchemaComparisonConfiguration comparisonConfiguration + ) throws InconsistentSchemaException { + Schema recordSchema = record.getSchema(); + + //validate current record's schema (this uses a cache internally to reduce dup work) + validateNestedSchema(recordSchema, path, visited, referenceDefinitions, comparisonConfiguration); + + //now recurse down into values in fields + List fields = recordSchema.getFields(); + for (Schema.Field field : fields) { + Object fieldValue = record.get(field.pos()); + if (fieldValue == null) { + continue; + } + path.appendPath(new LocationStep(".", field.name())); + try { + Schema fieldSchema = field.schema(); + Schema valueSchema; + switch (fieldSchema.getType()) { + case UNION: + valueSchema = AvroSchemaUtil.resolveUnionBranchOf(fieldValue, fieldSchema); // !=null + path.appendPath(new UnionTypePredicate(valueSchema.getName())); + try { + validateNestedValue(fieldValue, path, visited, referenceDefinitions, comparisonConfiguration); + } finally { + path.pop(); + } + break; + case ARRAY: + Collection arr = (Collection) fieldValue; + Iterator iter = arr.iterator(); + int i=0; + while (iter.hasNext()) { + Object value = iter.next(); + path.appendPath(new ArrayPositionPredicate(i)); + try { + validateNestedValue(value, path, visited, referenceDefinitions, comparisonConfiguration); + } finally { + path.pop(); + i++; + } + } + break; + case MAP: + @SuppressWarnings("unchecked") + Map map = (Map) fieldValue; + for (Map.Entry entry : map.entrySet()) { + CharSequence key = entry.getKey(); + Object value = entry.getValue(); + path.appendPath(new MapKeyPredicate(key)); + try { + validateNestedValue(value, path, visited, referenceDefinitions, comparisonConfiguration); + } finally { + path.pop(); + } + } + break; + default: + validateNestedValue(fieldValue, path, visited, referenceDefinitions, comparisonConfiguration); + } + } finally { + path.pop(); + } + } + } + + private static void validateNestedValue( + Object value, + AvroPath path, + IdentityHashMap visited, + Map referenceDefinitions, + SchemaComparisonConfiguration comparisonConfiguration + ) throws InconsistentSchemaException { + if (value == null) { + return; + } + if (value instanceof IndexedRecord) { + //generic or specific record (dont care which) + IndexedRecord record = (IndexedRecord) value; + validateNestedRecordSchema(record, path, visited, referenceDefinitions, comparisonConfiguration); + } else { + Schema declaredSchema = AvroSchemaUtil.getDeclaredSchema(value); + if (declaredSchema != null) { + validateNestedSchema(declaredSchema, path, visited, referenceDefinitions, comparisonConfiguration); + } + } + } + + private static void validateNestedSchema( + Schema schema, + AvroPath path, + IdentityHashMap visited, + Map referenceDefinitions, + SchemaComparisonConfiguration comparisonConfiguration + ) throws InconsistentSchemaException { + if (!visited.containsKey(schema)) { + visited.put(schema, Boolean.TRUE); //never again + String fullName = schema.getFullName(); + Schema reference = referenceDefinitions.get(fullName); + if (reference == null) { + throw new InconsistentSchemaException(path, "unexpected schema not in reference set: " + fullName + " at " + path); + } + if (!ConfigurableSchemaComparator.equals(schema, reference, comparisonConfiguration)) { + @SuppressWarnings("rawtypes") + AvscWriter avscWriter = AvroCompatibilityHelper.getAvscWriter(AvscGenerationConfig.CORRECT_ONELINE, null); + String actual = avscWriter.toAvsc(schema); + String expected = avscWriter.toAvsc(reference); + throw new InconsistentSchemaException(path, "schema " + fullName + " at " + path + " does not match nested definition in top level record. found: " + actual + ". expecting: " + expected); + } + } + } } diff --git a/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/avropath/MapKeyPredicate.java b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/avropath/MapKeyPredicate.java index 792cef749..f26bdbce1 100644 --- a/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/avropath/MapKeyPredicate.java +++ b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/avropath/MapKeyPredicate.java @@ -31,10 +31,15 @@ public class MapKeyPredicate implements PositionalPathPredicate { private final String key; + @Deprecated public MapKeyPredicate(String key) { this.key = key; } + public MapKeyPredicate(CharSequence key) { + this.key = key == null ? null : String.valueOf(key); + } + public String getKey() { return key; } diff --git a/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/exception/InconsistentSchemaException.java b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/exception/InconsistentSchemaException.java new file mode 100644 index 000000000..ad68191f9 --- /dev/null +++ b/helper/helper/src/main/java/com/linkedin/avroutil1/compatibility/exception/InconsistentSchemaException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 LinkedIn Corp. + * Licensed under the BSD 2-Clause License (the "License"). + * See License in the project root for license information. + */ + +package com.linkedin.avroutil1.compatibility.exception; + +import com.linkedin.avroutil1.compatibility.avropath.AvroPath; + +/** + * thrown to indicate there are multiple conflicting schema definitions + * within the same object graph or group of generated specific record classes + */ +public class InconsistentSchemaException extends Exception { + /** + * indicates the path to the location of the conflicting schema definition, from the + * "root" (outer-most) object or schema. + */ + private final AvroPath path; + + public InconsistentSchemaException(AvroPath path, String msg) { + super(msg); + this.path = path; + } + + public AvroPath getPath() { + return path; + } +} diff --git a/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/AvroRecordUtilTest.java b/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/AvroRecordUtilTest.java index a29943b98..875378827 100644 --- a/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/AvroRecordUtilTest.java +++ b/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/AvroRecordUtilTest.java @@ -9,13 +9,16 @@ import com.linkedin.avroutil1.testcommon.TestUtil; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; +import org.assertj.core.api.Assertions; import org.testng.Assert; import org.testng.annotations.Test; import under14.newnewpkg.inner.NewNewInnerRecordWithAliases; @@ -255,6 +258,120 @@ public void testStringCollectionFieldSetting() throws Exception { Assert.assertEquals(record.mapOfStrings, expected); } + @Test + public void testInnerSchemaValidation() throws Exception { + String outerAvsc = TestUtil.load("allavro/innerSchemaValidation/OuterRecord.avsc"); + String badRecordAvsc = TestUtil.load("allavro/innerSchemaValidation/BadInnerRecord.avsc"); + String badEnumAvsc = TestUtil.load("allavro/innerSchemaValidation/BadEnum.avsc"); + String badFixedAvsc = TestUtil.load("allavro/innerSchemaValidation/BadFixed.avsc"); + + Schema outerSchema = Schema.parse(outerAvsc); + Schema goodRecordSchema = outerSchema.getField("recordField").schema(); + Schema badRecordSchema = Schema.parse(badRecordAvsc); + Schema badEnumSchema = Schema.parse(badEnumAvsc); + Schema badFixedSchema = Schema.parse(badFixedAvsc); + + RandomRecordGenerator generator = new RandomRecordGenerator(); + + IndexedRecord outerRecord = (IndexedRecord) generator.randomGeneric(outerSchema, RecordGenerationConfig.NO_NULLS); + + //expected to pass + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + + IndexedRecord badInnerRecord = (IndexedRecord) generator.randomGeneric(badRecordSchema); + Object prevValue = outerRecord.get(0); + outerRecord.put(0, badInnerRecord); + + try { + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + Assertions.fail("expected to fail"); + } catch (Exception expected) { + //expected + } + + //restore record to good state + outerRecord.put(0, prevValue); + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + + //mutilate the union field + prevValue = outerRecord.get(1); + outerRecord.put(1, badInnerRecord); + try { + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + Assertions.fail("expected to fail"); + } catch (Exception expected) { + //expected + } + //restore record to good state + outerRecord.put(1, prevValue); + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + + //mutilate enums in list field if avro > 1.4 + //noinspection deprecation + if (AvroCompatibilityHelper.getRuntimeAvroVersion().laterThan(AvroVersion.AVRO_1_4)) { + GenericData.EnumSymbol badEnum = (GenericData.EnumSymbol) generator.randomGeneric(badEnumSchema); + + @SuppressWarnings("unchecked") + List list = (List) outerRecord.get(2); + IndexedRecord listItem; + if (list.isEmpty()) { + //put one InnerRecord into the list to play with + listItem = (IndexedRecord) generator.randomGeneric(goodRecordSchema, RecordGenerationConfig.NO_NULLS); + list.add(listItem); + } else { + listItem = (IndexedRecord) list.get(0); //InnerRecord + } + prevValue = listItem.get(0); //good enum instance + + listItem.put(0, badEnum); + try { + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + Assertions.fail("expected to fail"); + } catch (Exception expected) { + //expected + } + + //restore record to good state + listItem.put(0, prevValue); + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + } + + //mutilate fixeds in map field if avro > 1.4 + //noinspection deprecation + if (AvroCompatibilityHelper.getRuntimeAvroVersion().laterThan(AvroVersion.AVRO_1_4)) { + GenericData.Fixed badFixed = (GenericData.Fixed) generator.randomGeneric(badFixedSchema); + + @SuppressWarnings("unchecked") + Map map = (Map) outerRecord.get(3); + CharSequence key; + IndexedRecord mapItem; + if (map.isEmpty()) { + //put one InnerRecord into the map to play with + key = new Utf8("whatever"); + mapItem = (IndexedRecord) generator.randomGeneric(goodRecordSchema, RecordGenerationConfig.NO_NULLS); + map.put(key, mapItem); + } else { + Map.Entry entry = map.entrySet().iterator().next(); + key = entry.getKey(); + mapItem = (IndexedRecord) entry.getValue(); //InnerRecord + } + + prevValue = mapItem.get(1); //good fixed instance + + mapItem.put(1, badFixed); + try { + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + Assertions.fail("expected to fail"); + } catch (Exception expected) { + //expected + } + + //restore record to good state + mapItem.put(1, prevValue); + AvroRecordUtil.validateNestedSchemasConsistent(outerRecord); + } + } + private void convertRoundTrip(GenericRecord original) { Assert.assertNotNull(original); SpecificRecord converted = AvroRecordUtil.genericRecordToSpecificRecord(original, null, RecordConversionConfig.ALLOW_ALL_USE_UTF8); diff --git a/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/SchemaBuilderNamespaceInheritTest.java b/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/SchemaBuilderNamespaceInheritTest.java index 2cab1ebc6..305b54b62 100644 --- a/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/SchemaBuilderNamespaceInheritTest.java +++ b/helper/tests/helper-tests-allavro/src/test/java/com/linkedin/avroutil1/compatibility/SchemaBuilderNamespaceInheritTest.java @@ -44,8 +44,10 @@ public void testSchemaBuilderWithNamespaceInherit(String originalAvscFile, Strin validateSchema(originalSchema, newSchema, nameSpace, inheritNamespace, 0); String newAvsc = AvroCompatibilityHelper.toAvsc(newSchema, AvscGenerationConfig.CORRECT_PRETTY); + //on windows newAvsc has \r\n for linebreaks, which fails the check below + String newAvscSansCarriageReturns = newAvsc.replaceAll("\r", ""); String targetAvsc = TestUtil.load(resultAvscFile); - Assert.assertEquals(newAvsc, targetAvsc); + Assert.assertEquals(newAvscSansCarriageReturns, targetAvsc); } private void validateSchema(Schema originalSchema, Schema newSchema, String parentNameSpace, boolean inheritNamespace, diff --git a/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadEnum.avsc b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadEnum.avsc new file mode 100644 index 000000000..0fde7140d --- /dev/null +++ b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadEnum.avsc @@ -0,0 +1,8 @@ +{ + "type": "enum", + "namespace": "allavro.innerSchemaValidation", + "name": "InnerEnum", + "symbols": [ + "A", "B", "EVIL" + ] +} \ No newline at end of file diff --git a/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadFixed.avsc b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadFixed.avsc new file mode 100644 index 000000000..211fc609b --- /dev/null +++ b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadFixed.avsc @@ -0,0 +1,6 @@ +{ + "type": "fixed", + "namespace": "allavro.innerSchemaValidation", + "name": "InnerFixed", + "size": 666 +} \ No newline at end of file diff --git a/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadInnerRecord.avsc b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadInnerRecord.avsc new file mode 100644 index 000000000..d0d678eb0 --- /dev/null +++ b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/BadInnerRecord.avsc @@ -0,0 +1,29 @@ +{ + "type": "record", + "namespace": "allavro.innerSchemaValidation", + "name": "InnerRecord", + "fields": [ + { + "name": "enumField", + "type": { + "type": "enum", + "name": "InnerEnum", + "symbols": [ + "A", "B" + ] + } + }, + { + "name": "fixedField", + "type": { + "type": "fixed", + "name": "InnerFixed", + "size": 42 + } + }, + { + "name": "evilField", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/OuterRecord.avsc b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/OuterRecord.avsc new file mode 100644 index 000000000..b4af203b2 --- /dev/null +++ b/helper/tests/helper-tests-allavro/src/test/resources/allavro/innerSchemaValidation/OuterRecord.avsc @@ -0,0 +1,55 @@ +{ + "type": "record", + "namespace": "allavro.innerSchemaValidation", + "name": "OuterRecord", + "fields": [ + { + "name": "recordField", + "type": { + "type": "record", + "name": "InnerRecord", + "fields": [ + { + "name": "enumField", + "type": { + "type": "enum", + "name": "InnerEnum", + "symbols": [ + "A", "B" + ] + } + }, + { + "name": "fixedField", + "type": { + "type": "fixed", + "name": "InnerFixed", + "size": 42 + } + } + ] + } + }, + { + "name": "unionField", + "type": [ + "null", + "InnerRecord" + ] + }, + { + "name": "listField", + "type": { + "type": "array", + "items": "InnerRecord" + } + }, + { + "name": "mapField", + "type": { + "type": "map", + "values": "InnerRecord" + } + } + ] +} \ No newline at end of file