Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for schema evolution with structs in collections #167

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.SchemaUpdate.Consumer;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
Expand Down Expand Up @@ -82,18 +83,18 @@ private Record convertToRow(SinkRecord record) {
return recordConverter.convert(record.value());
}

List<SchemaUpdate> updates = Lists.newArrayList();
Record row = recordConverter.convert(record.value(), updates::add);
SchemaUpdate.Consumer updates = new Consumer();
Record row = recordConverter.convert(record.value(), updates);

if (!updates.isEmpty()) {
if (!updates.empty()) {
// complete the current file
flush();
// apply the schema updates, this will refresh the table
SchemaUtils.applySchemaUpdates(table, updates);
// initialize a new writer with the new schema
initNewWriter();
// convert the row again, this time using the new table schema
row = recordConverter.convert(record.value(), updates::add);
row = recordConverter.convert(record.value(), null);
}

return row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional;
import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
Expand All @@ -46,7 +43,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -94,7 +90,7 @@ public Record convert(Object data) {
return convert(data, null);
}

public Record convert(Object data, Consumer<SchemaUpdate> schemaUpdateConsumer) {
public Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) {
if (data instanceof Struct || data instanceof Map) {
return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer);
}
Expand All @@ -107,7 +103,7 @@ private NameMapping createNameMapping(Table table) {
}

private Object convertValue(
Object value, Type type, int fieldId, Consumer<SchemaUpdate> schemaUpdateConsumer) {
Object value, Type type, int fieldId, SchemaUpdate.Consumer schemaUpdateConsumer) {
if (value == null) {
return null;
}
Expand Down Expand Up @@ -151,7 +147,7 @@ protected GenericRecord convertStructValue(
Object value,
StructType schema,
int parentFieldId,
Consumer<SchemaUpdate> schemaUpdateConsumer) {
SchemaUpdate.Consumer schemaUpdateConsumer) {
if (value instanceof Map) {
return convertToStruct((Map<?, ?>) value, schema, parentFieldId, schemaUpdateConsumer);
} else if (value instanceof Struct) {
Expand All @@ -164,7 +160,7 @@ private GenericRecord convertToStruct(
Map<?, ?> map,
StructType schema,
int structFieldId,
Consumer<SchemaUpdate> schemaUpdateConsumer) {
SchemaUpdate.Consumer schemaUpdateConsumer) {
GenericRecord result = GenericRecord.create(schema);
map.forEach(
(recordFieldNameObj, recordFieldValue) -> {
Expand All @@ -178,8 +174,7 @@ private GenericRecord convertToStruct(
if (type.isPresent()) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
schemaUpdateConsumer.accept(
new AddColumn(parentFieldName, recordFieldName, type.get()));
schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type.get());
}
}
} else {
Expand All @@ -199,7 +194,7 @@ private GenericRecord convertToStruct(
Struct struct,
StructType schema,
int structFieldId,
Consumer<SchemaUpdate> schemaUpdateConsumer) {
SchemaUpdate.Consumer schemaUpdateConsumer) {
GenericRecord result = GenericRecord.create(schema);
struct
.schema()
Expand All @@ -213,8 +208,7 @@ private GenericRecord convertToStruct(
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.toIcebergType(recordField.schema(), config);
schemaUpdateConsumer.accept(
new AddColumn(parentFieldName, recordField.name(), type));
schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type);
}
} else {
boolean hasSchemaUpdates = false;
Expand All @@ -224,13 +218,13 @@ private GenericRecord convertToStruct(
SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema());
if (evolveDataType != null) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
schemaUpdateConsumer.accept(new UpdateType(fieldName, evolveDataType));
schemaUpdateConsumer.updateType(fieldName, evolveDataType);
hasSchemaUpdates = true;
}
// make optional if needed and schema evolution is on
if (tableField.isRequired() && recordField.schema().isOptional()) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
schemaUpdateConsumer.accept(new MakeOptional(fieldName));
schemaUpdateConsumer.makeOptional(fieldName);
hasSchemaUpdates = true;
}
}
Expand Down Expand Up @@ -277,7 +271,7 @@ private Map<String, NestedField> createStructNameMap(StructType schema) {
}

protected List<Object> convertListValue(
Object value, ListType type, Consumer<SchemaUpdate> schemaUpdateConsumer) {
Object value, ListType type, SchemaUpdate.Consumer schemaUpdateConsumer) {
Preconditions.checkArgument(value instanceof List);
List<?> list = (List<?>) value;
return list.stream()
Expand All @@ -290,14 +284,14 @@ protected List<Object> convertListValue(
}

protected Map<Object, Object> convertMapValue(
Object value, MapType type, Consumer<SchemaUpdate> schemaUpdateConsumer) {
Object value, MapType type, SchemaUpdate.Consumer schemaUpdateConsumer) {
Preconditions.checkArgument(value instanceof Map);
Map<?, ?> map = (Map<?, ?>) value;
Map<Object, Object> result = Maps.newHashMap();
map.forEach(
(k, v) -> {
int keyFieldId = type.fields().get(0).fieldId();
int valueFieldId = type.fields().get(0).fieldId();
int valueFieldId = type.fields().get(1).fieldId();
result.put(
convertValue(k, type.keyType(), keyFieldId, schemaUpdateConsumer),
convertValue(v, type.valueType(), valueFieldId, schemaUpdateConsumer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,49 @@
*/
package io.tabular.iceberg.connect.data;

import java.util.Collection;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Type.PrimitiveType;

public class SchemaUpdate {

public static class Consumer {
private final Map<String, AddColumn> addColumns = Maps.newHashMap();
private final Map<String, UpdateType> updateTypes = Maps.newHashMap();
private final Map<String, MakeOptional> makeOptionals = Maps.newHashMap();

public Collection<AddColumn> addColumns() {
return addColumns.values();
}

public Collection<UpdateType> updateTypes() {
return updateTypes.values();
}

public Collection<MakeOptional> makeOptionals() {
return makeOptionals.values();
}

public boolean empty() {
return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty();
}

public void addColumn(String parentName, String name, Type type) {
AddColumn addCol = new AddColumn(parentName, name, type);
addColumns.put(addCol.key(), addCol);
}

public void updateType(String name, PrimitiveType type) {
updateTypes.put(name, new UpdateType(name, type));
}

public void makeOptional(String name) {
makeOptionals.put(name, new MakeOptional(name));
}
}

public static class AddColumn extends SchemaUpdate {
private final String parentName;
private final String name;
Expand All @@ -42,6 +80,10 @@ public String name() {
return name;
}

public String key() {
return parentName == null ? name : parentName + "." + name;
}

public Type type() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Schema
return null;
}

public static void applySchemaUpdates(Table table, List<SchemaUpdate> updates) {
if (updates == null || updates.isEmpty()) {
public static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates) {
if (updates == null || updates.empty()) {
// no updates to apply
return;
}
Expand All @@ -93,31 +93,25 @@ public static void applySchemaUpdates(Table table, List<SchemaUpdate> updates) {
.run(notUsed -> commitSchemaUpdates(table, updates));
}

private static void commitSchemaUpdates(Table table, List<SchemaUpdate> updates) {
private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updates) {
// get the latest schema in case another process updated it
table.refresh();

// filter out columns that have already been added
List<AddColumn> addColumns =
updates.stream()
.filter(update -> update instanceof AddColumn)
.map(update -> (AddColumn) update)
updates.addColumns().stream()
.filter(addCol -> !columnExists(table.schema(), addCol))
.collect(toList());

// filter out columns that have the updated type
List<UpdateType> updateTypes =
updates.stream()
.filter(update -> update instanceof UpdateType)
.map(update -> (UpdateType) update)
updates.updateTypes().stream()
.filter(updateType -> !typeMatches(table.schema(), updateType))
.collect(toList());

// filter out columns that have already been made optional
List<MakeOptional> makeOptionals =
updates.stream()
.filter(update -> update instanceof MakeOptional)
.map(update -> (MakeOptional) update)
updates.makeOptionals().stream()
.filter(makeOptional -> !isOptional(table.schema(), makeOptional))
.collect(toList());

Expand Down
Loading