Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Nov 19, 2024
1 parent e3cec13 commit 6aaedff
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 46 deletions.
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ public Transaction newTransaction() {

@Override
public StaticTableOperations operations() {
if (lazyTable() instanceof BaseTable) {
BaseTable table = (BaseTable) lazyTable();
return (StaticTableOperations) table.operations();
} else if (lazyTable() instanceof BaseMetadataTable) {
BaseMetadataTable table = (BaseMetadataTable) lazyTable();
return (StaticTableOperations) table.table().operations();
}
return (StaticTableOperations) ((BaseTable) lazyTable()).operations();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.ScanTaskUtil;

/**
* Container class representing a set of position delete files to be rewritten by a {@link
Expand Down Expand Up @@ -109,7 +110,7 @@ public long rewrittenBytes() {
}

public long addedBytes() {
return addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
return addedDeleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum();
}

public int numRewrittenDeleteFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,6 @@ public long cardinality() {
return bitmap.cardinality();
}

@Override
public List<Long> positions() {
List<Long> positions = Lists.newArrayListWithCapacity((int) cardinality());
forEach(
pos -> {
if (isDeleted(pos)) {
positions.add(pos);
}
});
return positions;
}

/**
* Serializes the index using the following format:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.function.LongConsumer;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -118,13 +117,4 @@ static PositionDeleteIndex deserialize(byte[] bytes, DeleteFile deleteFile) {
static PositionDeleteIndex empty() {
return EmptyPositionDeleteIndex.get();
}

/**
* Returns all the set positions
*
* @return A list of all the set positions
*/
default List<Long> positions() {
throw new UnsupportedOperationException(getClass().getName() + " does not support positions");
}
}
62 changes: 62 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.function.Function;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* PartitioningDVWriter is a PartitioningWriter implementation which writes DVs for a given file
* position
*/
public class PartitioningDVWriter<T>
implements PartitioningWriter<PositionDelete<T>, DeleteWriteResult> {
private final DVFileWriter fileWriter;
private DeleteWriteResult result;

public PartitioningDVWriter(
OutputFileFactory fileFactory,
Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
this.fileWriter = new BaseDVFileWriter(fileFactory, loadPreviousDeletes::apply);
}

@Override
public void write(PositionDelete<T> row, PartitionSpec spec, StructLike partition) {
fileWriter.delete(row.path().toString(), row.pos(), spec, partition);
}

@Override
public DeleteWriteResult result() {
Preconditions.checkState(result != null, "Cannot get result from unclosed writer");
return result;
}

@Override
public void close() throws IOException {
fileWriter.close();
this.result = fileWriter.result();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@

import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -216,9 +219,25 @@ public FileFormat deleteFileFormat() {
confParser
.stringConf()
.option(SparkWriteOptions.DELETE_FORMAT)
.tableProperty(TableProperties.DELETE_DEFAULT_FILE_FORMAT)
.tableProperty(
formatVersion() >= 3
? FileFormat.PUFFIN.name()
: TableProperties.DELETE_DEFAULT_FILE_FORMAT)
.parseOptional();
return valueAsString != null ? FileFormat.fromString(valueAsString) : dataFileFormat();
return valueAsString != null
? FileFormat.fromString(valueAsString)
: formatVersion() >= 3 ? FileFormat.PUFFIN : dataFileFormat();
}

private int formatVersion() {
if (table instanceof HasTableOperations) {
TableOperations ops = ((HasTableOperations) table).operations();
return ops.current().formatVersion();
} else if (table instanceof BaseMetadataTable) {
return ((BaseMetadataTable) table).table().operations().current().formatVersion();
} else {
return 2;
}
}

private String deleteCompressionCodec() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private RewritePositionDeletesGroup rewriteDeleteFiles(
withJobGroupInfo(
newJobGroupInfo("REWRITE-POSITION-DELETES", desc),
() -> rewriter.rewrite(fileGroup.tasks()));
// TODO: this should return DVs with format version 3

fileGroup.setOutputFiles(addedFiles);
LOG.info("Rewrite position deletes ready to be committed - {}", desc);
return fileGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ protected void doRewrite(String groupId, List<PositionDeletesScanTask> group) {
.equalTo(dataFiles.col(MetadataColumns.DELETE_FILE_PATH.name()));
Dataset<Row> validDeletes = posDeletes.join(dataFiles, joinCond, "leftsemi");

// TODO: this writes V2 position deletes
// we need to rewrite those to V3 deletes here before bin-packing the V3 deletes
// write the packed deletes into new files where each split becomes a new file
validDeletes
.sortWithinPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -63,24 +62,38 @@ public CloseableIterator<InternalRow> iterator() {

private class PuffinIterator implements CloseableIterator<InternalRow> {
private final PuffinReader puffin;
private final Iterator<Long> positions;
private final Iterator<Pair<BlobMetadata, ByteBuffer>> blobs;
private Pair<BlobMetadata, ByteBuffer> currentBlob;
private Iterator<Long> positions;

PuffinIterator(PuffinReader puffin) {
this.puffin = puffin;
try {
Pair<BlobMetadata, ByteBuffer> current =
Iterables.getOnlyElement(puffin.readAll(puffin.fileMetadata().blobs()));
PositionDeleteIndex index =
PositionDeleteIndex.deserialize(current.second().array(), deleteFile);
positions = index.positions().iterator();
blobs = puffin.readAll(puffin.fileMetadata().blobs()).iterator();
advanceBlob();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void advanceBlob() {
if (blobs.hasNext()) {
this.currentBlob = blobs.next();
List<Long> pos = Lists.newArrayList();
PositionDeleteIndex.deserialize(currentBlob.second().array(), deleteFile).forEach(pos::add);
this.positions = pos.iterator();
} else {
this.currentBlob = null;
}
}

@Override
public boolean hasNext() {
return positions.hasNext();
boolean hasNextPosition = positions.hasNext();
if (!hasNextPosition) {
advanceBlob();
}
return null != currentBlob && hasNextPosition;
}

@Override
Expand All @@ -95,7 +108,7 @@ public InternalRow next() {
values.add(position);
}
if (null != projection.findField(MetadataColumns.DELETE_FILE_ROW_FIELD_ID)) {
values.add(null); // TODO actually deleted row info
values.add(null); // TODO deleted row info
}
if (null != projection.findField(MetadataColumns.PARTITION_COLUMN_ID)) {
StructInternalRow partition = new StructInternalRow(spec.partitionType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PositionDeletesTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningDVWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
Expand Down Expand Up @@ -306,9 +310,11 @@ private static class DeleteWriter implements DataWriter<InternalRow> {
private final int rowOrdinal;
private final int rowSize;
private final StructLike partition;
private final int formatVersion;

private ClusteredPositionDeleteWriter<InternalRow> writerWithRow;
private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow;
private PartitioningDVWriter<InternalRow> dvWriter;
private boolean closed = false;

/**
Expand Down Expand Up @@ -344,6 +350,7 @@ private static class DeleteWriter implements DataWriter<InternalRow> {
this.io = table.io();
this.spec = table.specs().get(specId);
this.partition = partition;
this.formatVersion = formatVersion(table);

this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name());
this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name());
Expand All @@ -355,17 +362,36 @@ private static class DeleteWriter implements DataWriter<InternalRow> {
this.rowSize = ((StructType) type).size();
}

private int formatVersion(Table table) {
if (table instanceof HasTableOperations) {
TableOperations ops = ((HasTableOperations) table).operations();
return ops.current().formatVersion();
} else if (table instanceof BaseMetadataTable) {
return ((BaseMetadataTable) table).table().operations().current().formatVersion();
} else {
return 2;
}
}

@Override
public void write(InternalRow record) throws IOException {
String file = record.getString(fileOrdinal);
long position = record.getLong(positionOrdinal);
InternalRow row = record.getStruct(rowOrdinal, rowSize);
if (row != null) {
positionDelete.set(file, position, row);
lazyWriterWithRow().write(positionDelete, spec, partition);
if (formatVersion >= 3) {
lazyDvWriter().write(positionDelete, spec, partition);
} else {
lazyWriterWithRow().write(positionDelete, spec, partition);
}
} else {
positionDelete.set(file, position, null);
lazyWriterWithoutRow().write(positionDelete, spec, partition);
if (formatVersion >= 3) {
lazyDvWriter().write(positionDelete, spec, partition);
} else {
lazyWriterWithoutRow().write(positionDelete, spec, partition);
}
}
}

Expand All @@ -390,6 +416,9 @@ public void close() throws IOException {
if (writerWithoutRow != null) {
writerWithoutRow.close();
}
if (dvWriter != null) {
dvWriter.close();
}
this.closed = true;
}
}
Expand All @@ -412,6 +441,13 @@ private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithoutRow() {
return writerWithoutRow;
}

private PartitioningDVWriter<InternalRow> lazyDvWriter() {
if (dvWriter == null) {
this.dvWriter = new PartitioningDVWriter<>(deleteFileFactory, p -> null);
}
return dvWriter;
}

private List<DeleteFile> allDeleteFiles() {
List<DeleteFile> allDeleteFiles = Lists.newArrayList();
if (writerWithRow != null) {
Expand All @@ -420,6 +456,9 @@ private List<DeleteFile> allDeleteFiles() {
if (writerWithoutRow != null) {
allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles());
}
if (dvWriter != null) {
allDeleteFiles.addAll(dvWriter.result().deleteFiles());
}
return allDeleteFiles;
}
}
Expand Down
Loading

0 comments on commit 6aaedff

Please sign in to comment.