Skip to content

Commit

Permalink
[LI] Add feature for Spark ORC reader to ignore field ids in files by…
Browse files Browse the repository at this point in the history
… using a new table property (#134)

* Revert "Add logic to derive partition column id from partition.column.ids pro… (#122)"

This reverts commit 21c3a80.

* [LI] Add feature for Spark ORC reader to ignore field ids in files by using a new table property
  • Loading branch information
rzhang10 authored Dec 14, 2022
1 parent 00b370e commit 976277d
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 60 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,7 @@ private TableProperties() {

public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;

public static final String READ_ORC_IGNORE_FILE_FIELD_IDS = "read.orc.ignore.field-ids.enabled";
public static final boolean READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.hivelink.core;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,7 +42,6 @@
import org.apache.iceberg.hivelink.core.utils.HiveTypeUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -90,8 +88,7 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
Types.StructType dataStructType = schema.asStruct();
List<Types.NestedField> fields = Lists.newArrayList(dataStructType.fields());

String partitionColumnIdMappingString = props.get("partition.column.ids");
Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema, partitionColumnIdMappingString);
Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema);
Types.StructType partitionStructType = partitionSchema.asStruct();
fields.addAll(partitionStructType.fields());
return new Schema(fields);
Expand All @@ -110,8 +107,7 @@ static StructTypeInfo structTypeInfoFromCols(List<FieldSchema> cols) {
return (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos);
}

private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema dataSchema, String idMapping) {
Map<String, Integer> nameToId = parsePartitionColId(idMapping);
private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema dataSchema) {
AtomicInteger fieldId = new AtomicInteger(10000);
List<Types.NestedField> partitionFields = Lists.newArrayList();
partitionKeys.forEach(f -> {
Expand All @@ -121,39 +117,11 @@ private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema da
}
partitionFields.add(
Types.NestedField.optional(
nameToId.containsKey(f.getName()) ? nameToId.get(f.getName()) : fieldId.incrementAndGet(),
f.getName(), primitiveIcebergType(f.getType()), f.getComment()));
fieldId.incrementAndGet(), f.getName(), primitiveIcebergType(f.getType()), f.getComment()));
});
return new Schema(partitionFields);
}

/**
*
* @param idMapping A comma separated string representation of column name
* and its id, e.g. partitionCol1:10,partitionCol2:11, no
* whitespace is allowed in the middle
* @return The parsed in-mem Map representation of the name to
* id mapping
*/
private static Map<String, Integer> parsePartitionColId(String idMapping) {
Map<String, Integer> nameToId = Maps.newHashMap();
if (idMapping != null) {
// parse idMapping string
Arrays.stream(idMapping.split(",")).forEach(kv -> {
String[] split = kv.split(":");
if (split.length != 2) {
throw new IllegalStateException(String.format(
"partition.column.ids property is invalid format: %s",
idMapping));
}
String name = split[0];
Integer id = Integer.parseInt(split[1]);
nameToId.put(name, id);
});
}
return nameToId;
}

private static Type primitiveIcebergType(String hiveTypeString) {
PrimitiveTypeInfo primitiveTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(hiveTypeString);
return HiveTypeUtil.convert(primitiveTypeInfo);
Expand Down
9 changes: 8 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public static class ReadBuilder {
private NameMapping nameMapping = null;
private OrcRowFilter rowFilter = null;

private boolean ignoreFileFieldIds = false;

private Function<TypeDescription, OrcRowReader<?>> readerFunc;
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE;
Expand Down Expand Up @@ -217,10 +219,15 @@ public ReadBuilder rowFilter(OrcRowFilter newRowFilter) {
return this;
}

public ReadBuilder setIgnoreFileFieldIds(boolean ignoreFileFieldIds) {
this.ignoreFileFieldIds = ignoreFileFieldIds;
return this;
}

public <D> CloseableIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new OrcIterable<>(file, conf, schema, nameMapping, start, length, readerFunc, caseSensitive, filter,
batchedReaderFunc, recordsPerBatch, rowFilter);
batchedReaderFunc, recordsPerBatch, rowFilter, ignoreFileFieldIds);
}
}

Expand Down
7 changes: 5 additions & 2 deletions orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private NameMapping nameMapping;
private final OrcRowFilter rowFilter;

private final boolean ignoreFileFieldIds;

OrcIterable(InputFile file, Configuration config, Schema schema,
NameMapping nameMapping, Long start, Long length,
Function<TypeDescription, OrcRowReader<?>> readerFunction, boolean caseSensitive, Expression filter,
Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction, int recordsPerBatch,
OrcRowFilter rowFilter) {
OrcRowFilter rowFilter, boolean ignoreFileFieldIds) {
this.schema = schema;
this.readerFunction = readerFunction;
this.file = file;
Expand All @@ -77,6 +79,7 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
this.batchReaderFunction = batchReaderFunction;
this.recordsPerBatch = recordsPerBatch;
this.rowFilter = rowFilter;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@SuppressWarnings("unchecked")
Expand All @@ -88,7 +91,7 @@ public CloseableIterator<T> iterator() {
TypeDescription fileSchema = orcFileReader.getSchema();
final TypeDescription readOrcSchema;
final TypeDescription fileSchemaWithIds;
if (ORCSchemaUtil.hasIds(fileSchema)) {
if (!ignoreFileFieldIds && ORCSchemaUtil.hasIds(fileSchema)) {
fileSchemaWithIds = fileSchema;
} else {
if (nameMapping == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final String nameMapping;
private final boolean caseSensitive;
private final int batchSize;
private final boolean ignoreFileFieldIds;

BatchDataReader(
CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO fileIo,
EncryptionManager encryptionManager, boolean caseSensitive, int size) {
EncryptionManager encryptionManager, boolean caseSensitive, int size, boolean ignoreFileFieldIds) {
super(task, fileIo, encryptionManager);
this.expectedSchema = expectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.batchSize = size;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@Override
Expand Down Expand Up @@ -98,7 +100,8 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
idToConstant))
.recordsPerBatch(batchSize)
.filter(task.residual())
.caseSensitive(caseSensitive);
.caseSensitive(caseSensitive)
.setIgnoreFileFieldIds(ignoreFileFieldIds);

if (nameMapping != null) {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ class RowDataReader extends BaseDataReader<InternalRow> {
private final Schema expectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
private final boolean ignoreFileFieldIds;

RowDataReader(
CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
EncryptionManager encryptionManager, boolean caseSensitive, boolean ignoreFileFieldIds) {
super(task, io, encryptionManager);
this.io = io;
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@Override
Expand Down Expand Up @@ -185,7 +187,8 @@ private CloseableIterable<InternalRow> newOrcIterable(
.createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant))
.filter(task.residual())
.caseSensitive(caseSensitive)
.rowFilter(orcRowFilter);
.rowFilter(orcRowFilter)
.setIgnoreFileFieldIds(ignoreFileFieldIds);

if (nameMapping != null) {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT;

public class RowDataRewriter implements Serializable {

Expand All @@ -64,6 +66,7 @@ public class RowDataRewriter implements Serializable {
private final LocationProvider locations;
private final String nameMapping;
private final boolean caseSensitive;
private final boolean ignoreFileFieldIds;

public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
Expand All @@ -80,6 +83,11 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
String formatString = table.properties().getOrDefault(
TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));

this.ignoreFileFieldIds = PropertyUtil.propertyAsBoolean(
table.properties(),
READ_ORC_IGNORE_FILE_FIELD_IDS,
READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT);
}

public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
Expand All @@ -96,7 +104,7 @@ private List<DataFile> rewriteDataForTask(CombinedScanTask task) throws Exceptio
long taskId = context.taskAttemptId();

RowDataReader dataReader = new RowDataReader(
task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive, ignoreFileFieldIds);

StructType structType = SparkSchemaUtil.convert(schema);
SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
Expand Down
45 changes: 32 additions & 13 deletions spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS;
import static org.apache.iceberg.TableProperties.READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT;

class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
SupportsPushDownRequiredColumns, SupportsReportStatistics {
Expand Down Expand Up @@ -223,6 +225,10 @@ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
boolean ignoreFileFieldIds = PropertyUtil.propertyAsBoolean(
table.properties(),
READ_ORC_IGNORE_FILE_FIELD_IDS,
READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT);

ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes),
"Cannot scan table %s: cannot apply required delete files", table);
Expand All @@ -231,7 +237,7 @@ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
localityPreferred, new BatchReaderFactory(batchSize)));
localityPreferred, new BatchReaderFactory(batchSize), ignoreFileFieldIds));
}
LOG.info("Batching input partitions with {} tasks.", readTasks.size());

Expand All @@ -246,12 +252,16 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
boolean ignoreFileFieldIds = PropertyUtil.propertyAsBoolean(
table.properties(),
READ_ORC_IGNORE_FILE_FIELD_IDS,
READ_ORC_IGNORE_FILE_FIELD_IDS_DEFAULT);

List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
localityPreferred, InternalRowReaderFactory.INSTANCE));
localityPreferred, InternalRowReaderFactory.INSTANCE, ignoreFileFieldIds));
}

return readTasks;
Expand Down Expand Up @@ -455,13 +465,15 @@ private static class ReadTask<T> implements Serializable, InputPartition<T> {
private final boolean localityPreferred;
private final ReaderFactory<T> readerFactory;

private final boolean ignoreFileFieldIds;
private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
private transient String[] preferredLocations = null;

private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory) {
boolean caseSensitive, boolean localityPreferred, ReaderFactory<T> readerFactory,
boolean ignoreFileFieldIds) {
this.task = task;
this.tableSchemaString = tableSchemaString;
this.expectedSchemaString = expectedSchemaString;
Expand All @@ -472,12 +484,13 @@ private ReadTask(CombinedScanTask task, String tableSchemaString, String expecte
this.preferredLocations = getPreferredLocations();
this.readerFactory = readerFactory;
this.nameMappingString = nameMappingString;
this.ignoreFileFieldIds = ignoreFileFieldIds;
}

@Override
public InputPartitionReader<T> createPartitionReader() {
return readerFactory.create(task, lazyTableSchema(), lazyExpectedSchema(), nameMappingString, io.value(),
encryptionManager.value(), caseSensitive);
encryptionManager.value(), caseSensitive, ignoreFileFieldIds);
}

@Override
Expand Down Expand Up @@ -513,7 +526,8 @@ private String[] getPreferredLocations() {
private interface ReaderFactory<T> extends Serializable {
InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive);
EncryptionManager encryptionManager, boolean caseSensitive,
boolean ignoreFileFieldIds);
}

private static class InternalRowReaderFactory implements ReaderFactory<InternalRow> {
Expand All @@ -525,8 +539,10 @@ private InternalRowReaderFactory() {
@Override
public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
EncryptionManager encryptionManager, boolean caseSensitive,
boolean ignoreFileFieldIds) {
return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive,
ignoreFileFieldIds);
}
}

Expand All @@ -540,22 +556,25 @@ private static class BatchReaderFactory implements ReaderFactory<ColumnarBatch>
@Override
public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
EncryptionManager encryptionManager, boolean caseSensitive,
boolean ignoreFileFieldIds) {
return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize,
ignoreFileFieldIds);
}
}

private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
RowReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
EncryptionManager encryptionManager, boolean caseSensitive, boolean ignoreFileFieldIds) {
super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, ignoreFileFieldIds);
}
}

private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
BatchReader(CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive, int size) {
super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size);
EncryptionManager encryptionManager, boolean caseSensitive, int size,
boolean ignoreFileFieldIds) {
super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size, ignoreFileFieldIds);
}
}
}
Loading

0 comments on commit 976277d

Please sign in to comment.