Skip to content

Commit

Permalink
Spark: Read DVs when reading from .position_deletes table
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Nov 26, 2024
1 parent 430ebff commit 88ff4d3
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.unsafe.types.UTF8String;

class DVIterable extends CloseableGroup implements CloseableIterable<InternalRow> {
private final Puffin.ReadBuilder builder;
private final PartitionSpec spec;
private final DeleteFile deleteFile;
private final Schema projection;

DVIterable(InputFile inputFile, DeleteFile deleteFile, PartitionSpec spec, Schema projection) {
this.deleteFile = deleteFile;
this.builder = Puffin.read(inputFile);
this.spec = spec;
this.projection = projection;
}

@Override
public CloseableIterator<InternalRow> iterator() {
PuffinReader reader = builder.build();
addCloseable(reader);
return new DVIterator(reader);
}

private class DVIterator implements CloseableIterator<InternalRow> {
private final PuffinReader reader;
private final Iterator<Pair<BlobMetadata, ByteBuffer>> blobs;
private Pair<BlobMetadata, ByteBuffer> currentBlob;
private Iterator<Long> positions;

DVIterator(PuffinReader reader) {
this.reader = reader;
try {
blobs = reader.readAll(reader.fileMetadata().blobs()).iterator();
advanceBlob();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void advanceBlob() {
if (blobs.hasNext()) {
this.currentBlob = blobs.next();
List<Long> pos = Lists.newArrayList();
PositionDeleteIndex.deserialize(currentBlob.second().array(), deleteFile).forEach(pos::add);
this.positions = pos.iterator();
} else {
this.currentBlob = null;
}
}

@Override
public boolean hasNext() {
boolean hasNextPosition = positions.hasNext();
if (!hasNextPosition) {
advanceBlob();
}

return null != currentBlob && hasNextPosition;
}

@Override
public InternalRow next() {
long position = positions.next();

List<Object> values = Lists.newArrayList();
if (null != projection.findField(MetadataColumns.DELETE_FILE_PATH.fieldId())) {
values.add(UTF8String.fromString(deleteFile.referencedDataFile()));
}

if (null != projection.findField(MetadataColumns.DELETE_FILE_POS.fieldId())) {
values.add(position);
}

if (null != projection.findField(MetadataColumns.DELETE_FILE_ROW_FIELD_ID)) {
// we don't have info about deleted rows with DVs, so always return null
values.add(null);
}

if (null != projection.findField(MetadataColumns.PARTITION_COLUMN_ID)) {
StructInternalRow partition = new StructInternalRow(spec.partitionType());
partition.setStruct(deleteFile.partition());
values.add(partition);
}

if (null != projection.findField(MetadataColumns.SPEC_ID_COLUMN_ID)) {
values.add(deleteFile.specId());
}

if (null != projection.findField(MetadataColumns.FILE_PATH_COLUMN_ID)) {
values.add(UTF8String.fromString(deleteFile.location()));
}

return new GenericInternalRow(values.toArray());
}

@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}

@Override
public void close() throws IOException {
if (null != reader) {
reader.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -29,10 +30,13 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
Expand All @@ -44,6 +48,7 @@ class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
implements PartitionReader<InternalRow> {

private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
private final CloseableGroup closeableGroup = new CloseableGroup();

PositionDeletesRowReader(SparkInputPartition partition) {
this(
Expand Down Expand Up @@ -90,15 +95,23 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
ExpressionUtil.extractByIdInclusive(
task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds));

return newIterable(
if (ContentFileUtil.isDV(task.file())) {
DVIterable dvIterable = new DVIterable(inputFile, task.file(), task.spec(), expectedSchema());
closeableGroup.addCloseable(dvIterable);
return dvIterable.iterator();
}

CloseableIterable<InternalRow> iterable =
newIterable(
inputFile,
task.file().format(),
task.start(),
task.length(),
residualWithoutConstants,
expectedSchema(),
idToConstant)
.iterator();
idToConstant);
closeableGroup.addCloseable(iterable);
return iterable.iterator();
}

private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
Expand All @@ -108,4 +121,9 @@ private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
.filter(id -> !idToConstant.containsKey(id))
.collect(Collectors.toSet());
}

@Override
public void close() throws IOException {
closeableGroup.close();
}
}

0 comments on commit 88ff4d3

Please sign in to comment.