Skip to content

Commit

Permalink
initial impl for AvroRecordUtil.validateNestedSchemasConsistent (#524)
Browse files Browse the repository at this point in the history
  • Loading branch information
radai-rosenblatt authored Nov 14, 2023
1 parent 3b5d833 commit 39547a2
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import com.linkedin.avroutil1.compatibility.avropath.ArrayPositionPredicate;
import com.linkedin.avroutil1.compatibility.avropath.AvroPath;
import com.linkedin.avroutil1.compatibility.avropath.LocationStep;
import com.linkedin.avroutil1.compatibility.avropath.MapKeyPredicate;
import com.linkedin.avroutil1.compatibility.avropath.UnionTypePredicate;
import com.linkedin.avroutil1.compatibility.exception.InconsistentSchemaException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
Expand Down Expand Up @@ -811,4 +820,150 @@ private static Enum<?> getSpecificEnumSymbol(Class<? extends Enum<?>> enumClass,
throw new IllegalStateException("while trying to resolve " + enumClass.getName() + ".valueOf(" + symbolStr + ")", e);
}
}

/**
* verifies that the schemas on any nested named types "contained" by a given IndexedRecord
* match the schemas declared by the records schema (or SCHEMA$ field for specific record classes).
* this is useful to validate in situations where various generated record classes can come from
* different jars (and so could have a schema different than the one contained in the outermost
* record's schema)
* @param record a record, which could contain other records or named types
* @throws com.linkedin.avroutil1.compatibility.exception.InconsistentSchemaException if the schema for the record does not match the schema declared by any nested object
*/
public static void validateNestedSchemasConsistent(IndexedRecord record) throws Exception {
if (record == null) {
return;
}
Schema outermost = record.getSchema();
Map<String, Schema> referenceDefinitions = AvroSchemaUtil.getAllDefinedSchemas(outermost);
AvroPath path = new AvroPath();
//we are trying to detect cases of multiple different definitions of the "same" schema.
//this means we cant cache results by schema fullname, but only by exact instance
//of org.apache.avro.Schema, hence the IndentityHashMap
IdentityHashMap<Schema, Boolean> visited = new IdentityHashMap<>();
SchemaComparisonConfiguration comparisonConfiguration = SchemaComparisonConfiguration.STRICT;
//noinspection deprecation
if (AvroCompatibilityHelper.getRuntimeAvroVersion().earlierThan(AvroVersion.AVRO_1_8)) {
//avro < 1.7.3 cant handle non-string properties, so we use a less strict comparison
comparisonConfiguration = SchemaComparisonConfiguration.PRE_1_7_3;
}
validateNestedRecordSchema(record, path, visited, referenceDefinitions, comparisonConfiguration);
}

private static void validateNestedRecordSchema(
IndexedRecord record,
AvroPath path,
IdentityHashMap<Schema, Boolean> visited,
Map<String, Schema> referenceDefinitions,
SchemaComparisonConfiguration comparisonConfiguration
) throws InconsistentSchemaException {
Schema recordSchema = record.getSchema();

//validate current record's schema (this uses a cache internally to reduce dup work)
validateNestedSchema(recordSchema, path, visited, referenceDefinitions, comparisonConfiguration);

//now recurse down into values in fields
List<Schema.Field> fields = recordSchema.getFields();
for (Schema.Field field : fields) {
Object fieldValue = record.get(field.pos());
if (fieldValue == null) {
continue;
}
path.appendPath(new LocationStep(".", field.name()));
try {
Schema fieldSchema = field.schema();
Schema valueSchema;
switch (fieldSchema.getType()) {
case UNION:
valueSchema = AvroSchemaUtil.resolveUnionBranchOf(fieldValue, fieldSchema); // !=null
path.appendPath(new UnionTypePredicate(valueSchema.getName()));
try {
validateNestedValue(fieldValue, path, visited, referenceDefinitions, comparisonConfiguration);
} finally {
path.pop();
}
break;
case ARRAY:
Collection<?> arr = (Collection<?>) fieldValue;
Iterator<?> iter = arr.iterator();
int i=0;
while (iter.hasNext()) {
Object value = iter.next();
path.appendPath(new ArrayPositionPredicate(i));
try {
validateNestedValue(value, path, visited, referenceDefinitions, comparisonConfiguration);
} finally {
path.pop();
i++;
}
}
break;
case MAP:
@SuppressWarnings("unchecked")
Map<CharSequence, ?> map = (Map<CharSequence, ?>) fieldValue;
for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
CharSequence key = entry.getKey();
Object value = entry.getValue();
path.appendPath(new MapKeyPredicate(key));
try {
validateNestedValue(value, path, visited, referenceDefinitions, comparisonConfiguration);
} finally {
path.pop();
}
}
break;
default:
validateNestedValue(fieldValue, path, visited, referenceDefinitions, comparisonConfiguration);
}
} finally {
path.pop();
}
}
}

private static void validateNestedValue(
Object value,
AvroPath path,
IdentityHashMap<Schema, Boolean> visited,
Map<String, Schema> referenceDefinitions,
SchemaComparisonConfiguration comparisonConfiguration
) throws InconsistentSchemaException {
if (value == null) {
return;
}
if (value instanceof IndexedRecord) {
//generic or specific record (dont care which)
IndexedRecord record = (IndexedRecord) value;
validateNestedRecordSchema(record, path, visited, referenceDefinitions, comparisonConfiguration);
} else {
Schema declaredSchema = AvroSchemaUtil.getDeclaredSchema(value);
if (declaredSchema != null) {
validateNestedSchema(declaredSchema, path, visited, referenceDefinitions, comparisonConfiguration);
}
}
}

private static void validateNestedSchema(
Schema schema,
AvroPath path,
IdentityHashMap<Schema, Boolean> visited,
Map<String, Schema> referenceDefinitions,
SchemaComparisonConfiguration comparisonConfiguration
) throws InconsistentSchemaException {
if (!visited.containsKey(schema)) {
visited.put(schema, Boolean.TRUE); //never again
String fullName = schema.getFullName();
Schema reference = referenceDefinitions.get(fullName);
if (reference == null) {
throw new InconsistentSchemaException(path, "unexpected schema not in reference set: " + fullName + " at " + path);
}
if (!ConfigurableSchemaComparator.equals(schema, reference, comparisonConfiguration)) {
@SuppressWarnings("rawtypes")
AvscWriter avscWriter = AvroCompatibilityHelper.getAvscWriter(AvscGenerationConfig.CORRECT_ONELINE, null);
String actual = avscWriter.toAvsc(schema);
String expected = avscWriter.toAvsc(reference);
throw new InconsistentSchemaException(path, "schema " + fullName + " at " + path + " does not match nested definition in top level record. found: " + actual + ". expecting: " + expected);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@
public class MapKeyPredicate implements PositionalPathPredicate {
private final String key;

@Deprecated
public MapKeyPredicate(String key) {
this.key = key;
}

public MapKeyPredicate(CharSequence key) {
this.key = key == null ? null : String.valueOf(key);
}

public String getKey() {
return key;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023 LinkedIn Corp.
* Licensed under the BSD 2-Clause License (the "License").
* See License in the project root for license information.
*/

package com.linkedin.avroutil1.compatibility.exception;

import com.linkedin.avroutil1.compatibility.avropath.AvroPath;

/**
* thrown to indicate there are multiple conflicting schema definitions
* within the same object graph or group of generated specific record classes
*/
public class InconsistentSchemaException extends Exception {
/**
* indicates the path to the location of the conflicting schema definition, from the
* "root" (outer-most) object or schema.
*/
private final AvroPath path;

public InconsistentSchemaException(AvroPath path, String msg) {
super(msg);
this.path = path;
}

public AvroPath getPath() {
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import com.linkedin.avroutil1.testcommon.TestUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.Test;
import under14.newnewpkg.inner.NewNewInnerRecordWithAliases;
Expand Down Expand Up @@ -255,6 +258,120 @@ public void testStringCollectionFieldSetting() throws Exception {
Assert.assertEquals(record.mapOfStrings, expected);
}

@Test
public void testInnerSchemaValidation() throws Exception {
String outerAvsc = TestUtil.load("allavro/innerSchemaValidation/OuterRecord.avsc");
String badRecordAvsc = TestUtil.load("allavro/innerSchemaValidation/BadInnerRecord.avsc");
String badEnumAvsc = TestUtil.load("allavro/innerSchemaValidation/BadEnum.avsc");
String badFixedAvsc = TestUtil.load("allavro/innerSchemaValidation/BadFixed.avsc");

Schema outerSchema = Schema.parse(outerAvsc);
Schema goodRecordSchema = outerSchema.getField("recordField").schema();
Schema badRecordSchema = Schema.parse(badRecordAvsc);
Schema badEnumSchema = Schema.parse(badEnumAvsc);
Schema badFixedSchema = Schema.parse(badFixedAvsc);

RandomRecordGenerator generator = new RandomRecordGenerator();

IndexedRecord outerRecord = (IndexedRecord) generator.randomGeneric(outerSchema, RecordGenerationConfig.NO_NULLS);

//expected to pass
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);

IndexedRecord badInnerRecord = (IndexedRecord) generator.randomGeneric(badRecordSchema);
Object prevValue = outerRecord.get(0);
outerRecord.put(0, badInnerRecord);

try {
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);
Assertions.fail("expected to fail");
} catch (Exception expected) {
//expected
}

//restore record to good state
outerRecord.put(0, prevValue);
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);

//mutilate the union field
prevValue = outerRecord.get(1);
outerRecord.put(1, badInnerRecord);
try {
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);
Assertions.fail("expected to fail");
} catch (Exception expected) {
//expected
}
//restore record to good state
outerRecord.put(1, prevValue);
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);

//mutilate enums in list field if avro > 1.4
//noinspection deprecation
if (AvroCompatibilityHelper.getRuntimeAvroVersion().laterThan(AvroVersion.AVRO_1_4)) {
GenericData.EnumSymbol badEnum = (GenericData.EnumSymbol) generator.randomGeneric(badEnumSchema);

@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) outerRecord.get(2);
IndexedRecord listItem;
if (list.isEmpty()) {
//put one InnerRecord into the list to play with
listItem = (IndexedRecord) generator.randomGeneric(goodRecordSchema, RecordGenerationConfig.NO_NULLS);
list.add(listItem);
} else {
listItem = (IndexedRecord) list.get(0); //InnerRecord
}
prevValue = listItem.get(0); //good enum instance

listItem.put(0, badEnum);
try {
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);
Assertions.fail("expected to fail");
} catch (Exception expected) {
//expected
}

//restore record to good state
listItem.put(0, prevValue);
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);
}

//mutilate fixeds in map field if avro > 1.4
//noinspection deprecation
if (AvroCompatibilityHelper.getRuntimeAvroVersion().laterThan(AvroVersion.AVRO_1_4)) {
GenericData.Fixed badFixed = (GenericData.Fixed) generator.randomGeneric(badFixedSchema);

@SuppressWarnings("unchecked")
Map<CharSequence, Object> map = (Map<CharSequence, Object>) outerRecord.get(3);
CharSequence key;
IndexedRecord mapItem;
if (map.isEmpty()) {
//put one InnerRecord into the map to play with
key = new Utf8("whatever");
mapItem = (IndexedRecord) generator.randomGeneric(goodRecordSchema, RecordGenerationConfig.NO_NULLS);
map.put(key, mapItem);
} else {
Map.Entry<CharSequence, ?> entry = map.entrySet().iterator().next();
key = entry.getKey();
mapItem = (IndexedRecord) entry.getValue(); //InnerRecord
}

prevValue = mapItem.get(1); //good fixed instance

mapItem.put(1, badFixed);
try {
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);
Assertions.fail("expected to fail");
} catch (Exception expected) {
//expected
}

//restore record to good state
mapItem.put(1, prevValue);
AvroRecordUtil.validateNestedSchemasConsistent(outerRecord);
}
}

private void convertRoundTrip(GenericRecord original) {
Assert.assertNotNull(original);
SpecificRecord converted = AvroRecordUtil.genericRecordToSpecificRecord(original, null, RecordConversionConfig.ALLOW_ALL_USE_UTF8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ public void testSchemaBuilderWithNamespaceInherit(String originalAvscFile, Strin
validateSchema(originalSchema, newSchema, nameSpace, inheritNamespace, 0);

String newAvsc = AvroCompatibilityHelper.toAvsc(newSchema, AvscGenerationConfig.CORRECT_PRETTY);
//on windows newAvsc has \r\n for linebreaks, which fails the check below
String newAvscSansCarriageReturns = newAvsc.replaceAll("\r", "");
String targetAvsc = TestUtil.load(resultAvscFile);
Assert.assertEquals(newAvsc, targetAvsc);
Assert.assertEquals(newAvscSansCarriageReturns, targetAvsc);
}

private void validateSchema(Schema originalSchema, Schema newSchema, String parentNameSpace, boolean inheritNamespace,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"type": "enum",
"namespace": "allavro.innerSchemaValidation",
"name": "InnerEnum",
"symbols": [
"A", "B", "EVIL"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "fixed",
"namespace": "allavro.innerSchemaValidation",
"name": "InnerFixed",
"size": 666
}
Loading

0 comments on commit 39547a2

Please sign in to comment.