Skip to content

Commit

Permalink
initial SchemaBuilder support (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
radai-rosenblatt authored Jun 26, 2021
1 parent d99099d commit 606c576
Show file tree
Hide file tree
Showing 21 changed files with 796 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 LinkedIn Corp.
* Licensed under the BSD 2-Clause License (the "License").
* See License in the project root for license information.
*/

package com.linkedin.avroutil1.compatibility;

import org.apache.avro.Schema;

import java.util.ArrayList;
import java.util.List;

public abstract class AbstractSchemaBuilder implements SchemaBuilder {

protected final AvroAdapter _adapter;
protected Schema.Type _type;
protected String _name;
protected String _namespace;
protected String _doc;
protected boolean _isError;
protected List<Schema.Field> _fields;

protected AbstractSchemaBuilder(AvroAdapter _adapter, Schema original) {
this._adapter = _adapter;
_type = original.getType();
_name = original.getName();
_namespace = original.getNamespace();
_doc = original.getDoc();
_isError = original.isError();
//make a copy of fields so its mutable
_fields = new ArrayList<>(original.getFields());
}

@Override
public SchemaBuilder addField(Schema.Field field) {
checkNewField(field);
_fields.add(field);
return this;
}

@Override
public SchemaBuilder addField(int position, Schema.Field field) {
checkNewField(field);
_fields.add(position, field); //will throw IOOB on bad positions
return this;
}

@Override
public SchemaBuilder removeField(String fieldName) {
if (fieldName == null || fieldName.isEmpty()) {
throw new IllegalArgumentException("argument cannot be null or empty");
}
int index = fieldPositionByName(fieldName, false);
if (index >= 0) {
_fields.remove(index);
}
return this;
}

@Override
public SchemaBuilder removeField(int position) {
_fields.remove(position); //throws IOOB
return this;
}

/**
* {@link Schema.Field} has a position ("pos") property that is set when its added to a schema.
* this means we need to clone fields to add them to another schema
* @param originals list of fields to clone
* @return list of cloned fields
*/
protected List<Schema.Field> cloneFields(List<Schema.Field> originals) {
List<Schema.Field> clones = new ArrayList<>(originals.size());
for (Schema.Field original : originals) {
FieldBuilder fb = _adapter.cloneSchemaField(original);
Schema.Field clone = fb.build();
clones.add(clone);
}
return clones;
}

protected void checkNewField(Schema.Field field) {
if (field == null) {
throw new IllegalArgumentException("argument cannot be null");
}
int otherIndex = fieldPositionByName(field.name(), false);
if (otherIndex >= 0) {
throw new IllegalArgumentException("schema already contains a field called " + field.name());
}
}

protected int fieldPositionByName(String name, boolean caseSensitive) {
for (int i = 0; i < _fields.size(); i++) {
Schema.Field candidate = _fields.get(i);
String cName = candidate.name();
if (caseSensitive) {
if (cName.equals(name)) {
return i;
}
} else if (cName.equalsIgnoreCase(name)){
return i;
}
}
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ BinaryDecoder newBinaryDecoder(byte[] bytes, int offset,

FieldBuilder newFieldBuilder(String name);

SchemaBuilder cloneSchema(Schema schema);

String getFieldPropAsJsonString(Schema.Field field, String propName);

String getSchemaPropAsJsonString(Schema schema, String propName);

//code generation

Collection<AvroGeneratedSourceCode> compile(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021 LinkedIn Corp.
* Licensed under the BSD 2-Clause License (the "License").
* See License in the project root for license information.
*/

package com.linkedin.avroutil1.compatibility;

import org.apache.avro.Schema;

/**
* Builder for creating {@link Schema} instances at runtime
*/
public interface SchemaBuilder {

/**
* add a field to the schema under construction. field is added
* to the end of the field list
* @param field new field to add
* @return the builder
* @throws IllegalArgumentException if a field by the same name
* (case INSENSITIVE) exists
*/
SchemaBuilder addField(Schema.Field field);

/**
* add a field to the schema under construction at the specified position.
* existing fields starting from the specified position are "right shifted"
* @param position desired position for the field to be added, 0 based.
* @param field field to add
* @return the builder
* @throws IllegalArgumentException if a field by the same name
* (case INSENSITIVE) exists
* @throws IndexOutOfBoundsException if index is invalid
*/
SchemaBuilder addField(int position, Schema.Field field);

/**
* removes a field by its (case INSENSITIVE) name, if such a field exists.
* @param fieldName name of field to be removed. required.
* @return the builder
* @throws IllegalArgumentException if argument is null or emoty
*/
SchemaBuilder removeField(String fieldName);

/**
* removes a field by its position.
* @param position position (0 based) os the field to remove
* @return the builder
* @throws IndexOutOfBoundsException if position is invalid
*/
SchemaBuilder removeField(int position);

/**
* constructs a {@link Schema} out of this builder
* @return a {@link Schema}
*/
Schema build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ public static FieldBuilder cloneSchemaField(Schema.Field field) {
return ADAPTER.cloneSchemaField(field);
}

public static SchemaBuilder cloneSchema(Schema schema) {
assertAvroAvailable();
return ADAPTER.cloneSchema(schema);
}

// code generation

/**
Expand Down Expand Up @@ -646,4 +651,8 @@ public static Schema.Field createSchemaField(String name, Schema schema, String
public static String getFieldPropAsJsonString(Schema.Field field, String propName) {
return ADAPTER.getFieldPropAsJsonString(field, propName);
}

public static String getSchemaPropAsJsonString(Schema schema, String propName) {
return ADAPTER.getSchemaPropAsJsonString(schema, propName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.avroutil1.compatibility.CodeGenerationConfig;
import com.linkedin.avroutil1.compatibility.CodeTransformations;
import com.linkedin.avroutil1.compatibility.FieldBuilder;
import com.linkedin.avroutil1.compatibility.SchemaBuilder;
import com.linkedin.avroutil1.compatibility.SchemaParseConfiguration;
import com.linkedin.avroutil1.compatibility.SchemaParseResult;
import com.linkedin.avroutil1.compatibility.SkipDecoder;
Expand Down Expand Up @@ -266,19 +267,21 @@ public FieldBuilder newFieldBuilder(String name) {
return new FieldBuilder110(name);
}

@Override
public SchemaBuilder cloneSchema(Schema schema) {
return new SchemaBuilder110(this, schema);
}

@Override
public String getFieldPropAsJsonString(Schema.Field field, String propName) {
Object val = field.getObjectProp(propName);
if (val == null) {
return null;
}
//the following is VERY rube-goldberg-ish, but will do until someone complains
JsonNode asJsonNode = JacksonUtils.toJsonNode(val);
try {
return OBJECT_MAPPER.writeValueAsString(asJsonNode);
} catch (Exception issue) {
throw new IllegalStateException("while trying to serialize " + val + " (a " + val.getClass().getName() + ")", issue);
}
return objectPropToJsonString(val);
}

@Override
public String getSchemaPropAsJsonString(Schema schema, String propName) {
Object val = schema.getObjectProp(propName);
return objectPropToJsonString(val);
}

@Override
Expand Down Expand Up @@ -341,6 +344,19 @@ public Collection<AvroGeneratedSourceCode> compile(
}
}

private String objectPropToJsonString(Object val) {
if (val == null) {
return null;
}
//the following is VERY rube-goldberg-ish, but will do until someone complains
JsonNode asJsonNode = JacksonUtils.toJsonNode(val);
try {
return OBJECT_MAPPER.writeValueAsString(asJsonNode);
} catch (Exception issue) {
throw new IllegalStateException("while trying to serialize " + val + " (a " + val.getClass().getName() + ")", issue);
}
}

private Collection<AvroGeneratedSourceCode> transform(List<AvroGeneratedSourceCode> avroGenerated, AvroVersion minAvro, AvroVersion maxAvro) {
List<AvroGeneratedSourceCode> transformed = new ArrayList<>(avroGenerated.size());
for (AvroGeneratedSourceCode generated : avroGenerated) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 LinkedIn Corp.
* Licensed under the BSD 2-Clause License (the "License").
* See License in the project root for license information.
*/

package com.linkedin.avroutil1.compatibility.avro110;

import com.linkedin.avroutil1.compatibility.AbstractSchemaBuilder;
import com.linkedin.avroutil1.compatibility.AvroAdapter;
import org.apache.avro.Schema;

import java.util.Map;

public class SchemaBuilder110 extends AbstractSchemaBuilder {

private Map<String, Object> _props;

public SchemaBuilder110(AvroAdapter adapter, Schema original) {
super(adapter, original);
_props = original.getObjectProps();
}

@Override
public Schema build() {
if (_type == null) {
throw new IllegalArgumentException("type not set");
}
Schema result;
//noinspection SwitchStatementWithTooFewBranches
switch (_type) {
case RECORD:
result = Schema.createRecord(_name, _doc, _namespace, _isError);
result.setFields(cloneFields(_fields));
if (_props != null && !_props.isEmpty()) {
for (Map.Entry<String, Object> entry : _props.entrySet()) {
result.addProp(entry.getKey(), entry.getValue());
}
}
break;
default:
throw new UnsupportedOperationException("unhandled type " + _type);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.avroutil1.compatibility.CodeGenerationConfig;
import com.linkedin.avroutil1.compatibility.CodeTransformations;
import com.linkedin.avroutil1.compatibility.FieldBuilder;
import com.linkedin.avroutil1.compatibility.SchemaBuilder;
import com.linkedin.avroutil1.compatibility.SchemaNormalization;
import com.linkedin.avroutil1.compatibility.SchemaParseConfiguration;
import com.linkedin.avroutil1.compatibility.SchemaParseResult;
Expand Down Expand Up @@ -238,12 +239,23 @@ public FieldBuilder newFieldBuilder(String name) {
return new FieldBuilder14(name);
}

@Override
public SchemaBuilder cloneSchema(Schema schema) {
return new SchemaBuilder14(this, schema);
}

@Override
public String getFieldPropAsJsonString(Schema.Field field, String propName) {
String val = field.getProp(propName);
return val == null ? null : "\"" + val + "\"";
}

@Override
public String getSchemaPropAsJsonString(Schema schema, String propName) {
String val = schema.getProp(propName);
return val == null ? null : "\"" + val + "\"";
}

@Override
public Collection<AvroGeneratedSourceCode> compile(
Collection<Schema> toCompile,
Expand Down
Loading

0 comments on commit 606c576

Please sign in to comment.