Skip to content

Commit

Permalink
Fix error when building entries system table after position deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 authored and ebyhr committed Feb 4, 2025
1 parent 87cd275 commit 3efd834
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;

Expand Down Expand Up @@ -202,13 +204,26 @@ private void appendDataFile(RowBlockBuilder blockBuilder, StructProjection dataF
Map<Integer, Long> nanValueCounts = dataFile.get(++position, Map.class);
appendIntegerBigintMap((MapBlockBuilder) fieldBuilders.get(position), nanValueCounts);

//noinspection unchecked
Map<Integer, ByteBuffer> lowerBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), lowerBounds);
switch (ContentType.of(content)) {
case DATA, EQUALITY_DELETE -> {
//noinspection unchecked
Map<Integer, ByteBuffer> lowerBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), lowerBounds);

//noinspection unchecked
Map<Integer, ByteBuffer> upperBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), upperBounds);
//noinspection unchecked
Map<Integer, ByteBuffer> upperBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), upperBounds);
}
case POSITION_DELETE -> {
//noinspection unchecked
Map<Integer, ByteBuffer> lowerBounds = dataFile.get(++position, Map.class);
appendBoundsForPositionDelete((MapBlockBuilder) fieldBuilders.get(position), lowerBounds);

//noinspection unchecked
Map<Integer, ByteBuffer> upperBounds = dataFile.get(++position, Map.class);
appendBoundsForPositionDelete((MapBlockBuilder) fieldBuilders.get(position), upperBounds);
}
}

ByteBuffer keyMetadata = dataFile.get(++position, ByteBuffer.class);
if (keyMetadata == null) {
Expand All @@ -222,12 +237,30 @@ private void appendDataFile(RowBlockBuilder blockBuilder, StructProjection dataF
List<Long> splitOffsets = dataFile.get(++position, List.class);
appendBigintArray((ArrayBlockBuilder) fieldBuilders.get(position), splitOffsets);

//noinspection unchecked
List<Long> equalityIds = dataFile.get(++position, List.class);
appendBigintArray((ArrayBlockBuilder) fieldBuilders.get(position), equalityIds);
switch (ContentType.of(content)) {
case DATA -> {
// data files don't have equality ids
fieldBuilders.get(++position).appendNull();

Integer sortOrderId = dataFile.get(++position, Integer.class);
INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId));
Integer sortOrderId = dataFile.get(++position, Integer.class);
INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId));
}
case POSITION_DELETE -> {
// position delete files don't have equality ids
fieldBuilders.get(++position).appendNull();

// position delete files don't have sort order id
fieldBuilders.get(++position).appendNull();
}
case EQUALITY_DELETE -> {
//noinspection unchecked
List<Integer> equalityIds = dataFile.get(++position, List.class);
appendIntegerArray((ArrayBlockBuilder) fieldBuilders.get(position), equalityIds);

Integer sortOrderId = dataFile.get(++position, Integer.class);
INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId));
}
}
});
}

Expand All @@ -244,6 +277,19 @@ public static void appendBigintArray(ArrayBlockBuilder blockBuilder, @Nullable L
});
}

public static void appendIntegerArray(ArrayBlockBuilder blockBuilder, @Nullable List<Integer> values)
{
if (values == null) {
blockBuilder.appendNull();
return;
}
blockBuilder.buildEntry(elementBuilder -> {
for (Integer value : values) {
INTEGER.writeLong(elementBuilder, value);
}
});
}

private static void appendIntegerBigintMap(MapBlockBuilder blockBuilder, @Nullable Map<Integer, Long> values)
{
if (values == null) {
Expand All @@ -268,4 +314,43 @@ private void appendIntegerVarcharMap(MapBlockBuilder blockBuilder, @Nullable Map
VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(type, Conversions.fromByteBuffer(type, value)));
}));
}

private static void appendBoundsForPositionDelete(MapBlockBuilder blockBuilder, @Nullable Map<Integer, ByteBuffer> values)
{
if (values == null) {
blockBuilder.appendNull();
return;
}

blockBuilder.buildEntry((keyBuilder, valueBuilder) -> {
INTEGER.writeLong(keyBuilder, DELETE_FILE_POS.fieldId());
ByteBuffer pos = values.get(DELETE_FILE_POS.fieldId());
checkArgument(pos != null, "delete file pos is null");
VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(Types.LongType.get(), Conversions.fromByteBuffer(Types.LongType.get(), pos)));

INTEGER.writeLong(keyBuilder, DELETE_FILE_PATH.fieldId());
ByteBuffer path = values.get(DELETE_FILE_PATH.fieldId());
checkArgument(path != null, "delete file path is null");
VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(Types.StringType.get(), Conversions.fromByteBuffer(Types.StringType.get(), path)));
});
}

private enum ContentType
{
DATA,
POSITION_DELETE,
EQUALITY_DELETE;

static ContentType of(int content)
{
checkArgument(content >= 0 && content <= 2, "Unexpected content type: %s", content);
if (content == 0) {
return DATA;
}
if (content == 1) {
return POSITION_DELETE;
}
return EQUALITY_DELETE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

Expand Down Expand Up @@ -639,6 +643,129 @@ void testEntriesTable()
}
}

@Test
void testEntriesAfterPositionDelete()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) {
assertUpdate("DELETE FROM " + table.getName() + " WHERE id = 1", 1);

Table icebergTable = loadTable(table.getName());
Snapshot snapshot = icebergTable.currentSnapshot();
long snapshotId = snapshot.snapshotId();
long sequenceNumber = snapshot.sequenceNumber();

assertThat(computeScalar("SELECT status FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(1);
assertThat(computeScalar("SELECT snapshot_id FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(snapshotId);
assertThat(computeScalar("SELECT sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(sequenceNumber);
assertThat(computeScalar("SELECT file_sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(2L);

MaterializedRow deleteFile = (MaterializedRow) computeScalar("SELECT data_file FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId);
assertThat(deleteFile.getFieldCount()).isEqualTo(16);
assertThat(deleteFile.getField(0)).isEqualTo(1); // content
assertThat((String) deleteFile.getField(1)).endsWith(format.toString().toLowerCase(ENGLISH)); // file_path
assertThat(deleteFile.getField(2)).isEqualTo(format.toString()); // file_format
assertThat(deleteFile.getField(3)).isEqualTo(0); // spec_id
assertThat(deleteFile.getField(4)).isEqualTo(1L); // record_count
assertThat((long) deleteFile.getField(5)).isPositive(); // file_size_in_bytes

//noinspection unchecked
Map<Integer, Long> columnSizes = (Map<Integer, Long>) deleteFile.getField(6);
switch (format) {
case ORC -> assertThat(columnSizes).isNull();
case PARQUET -> assertThat(columnSizes)
.hasSize(2)
.satisfies(_ -> assertThat(columnSizes.get(DELETE_FILE_POS.fieldId())).isPositive())
.satisfies(_ -> assertThat(columnSizes.get(DELETE_FILE_PATH.fieldId())).isPositive());
default -> throw new IllegalArgumentException("Unsupported format: " + format);
}

assertThat(deleteFile.getField(7)).isEqualTo(Map.of(DELETE_FILE_POS.fieldId(), 1L, DELETE_FILE_PATH.fieldId(), 1L)); // value_counts
assertThat(deleteFile.getField(8)).isEqualTo(Map.of(DELETE_FILE_POS.fieldId(), 0L, DELETE_FILE_PATH.fieldId(), 0L)); // null_value_counts
assertThat(deleteFile.getField(9)).isEqualTo(value(Map.of(), null)); // nan_value_counts

// lower_bounds
//noinspection unchecked
Map<Integer, String> lowerBounds = (Map<Integer, String>) deleteFile.getField(10);
assertThat(lowerBounds)
.hasSize(2)
.satisfies(_ -> assertThat(lowerBounds.get(DELETE_FILE_POS.fieldId())).isEqualTo("0"))
.satisfies(_ -> assertThat(lowerBounds.get(DELETE_FILE_PATH.fieldId())).contains(table.getName()));

// upper_bounds
//noinspection unchecked
Map<Integer, String> upperBounds = (Map<Integer, String>) deleteFile.getField(11);
assertThat(lowerBounds)
.hasSize(2)
.satisfies(_ -> assertThat(upperBounds.get(DELETE_FILE_POS.fieldId())).isEqualTo("0"))
.satisfies(_ -> assertThat(upperBounds.get(DELETE_FILE_PATH.fieldId())).contains(table.getName()));

assertThat(deleteFile.getField(12)).isNull(); // key_metadata
assertThat(deleteFile.getField(13)).isEqualTo(List.of(value(4L, 3L))); // split_offsets
assertThat(deleteFile.getField(14)).isNull(); // equality_ids
assertThat(deleteFile.getField(15)).isNull(); // sort_order_id

assertThat(computeScalar("SELECT readable_metrics FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo("""
{\
"dt":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},\
"id":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}\
}""");
}
}

@Test
void testEntriesAfterEqualityDelete()
throws Exception
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) {
Table icebergTable = loadTable(table.getName());
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
writeEqualityDeleteForTable(icebergTable, fileSystemFactory, Optional.empty(), Optional.empty(), ImmutableMap.of("id", 1), Optional.empty());
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "1");

Snapshot snapshot = icebergTable.currentSnapshot();
long snapshotId = snapshot.snapshotId();
long sequenceNumber = snapshot.sequenceNumber();

assertThat(computeScalar("SELECT status FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(1);
assertThat(computeScalar("SELECT snapshot_id FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(snapshotId);
assertThat(computeScalar("SELECT sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(sequenceNumber);
assertThat(computeScalar("SELECT file_sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(2L);

MaterializedRow dataFile = (MaterializedRow) computeScalar("SELECT data_file FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId);
assertThat(dataFile.getFieldCount()).isEqualTo(16);
assertThat(dataFile.getField(0)).isEqualTo(2); // content
assertThat(dataFile.getField(3)).isEqualTo(0); // spec_id
assertThat(dataFile.getField(4)).isEqualTo(1L); // record_count
assertThat((long) dataFile.getField(5)).isPositive(); // file_size_in_bytes
assertThat(dataFile.getField(6)).isEqualTo(Map.of(1, 45L)); // column_sizes
assertThat(dataFile.getField(7)).isEqualTo(Map.of(1, 1L)); // value_counts
assertThat(dataFile.getField(8)).isEqualTo(Map.of(1, 0L)); // null_value_counts
assertThat(dataFile.getField(9)).isEqualTo(Map.of()); // nan_value_counts
assertThat(dataFile.getField(10)).isEqualTo(Map.of(1, "1")); // lower_bounds
assertThat(dataFile.getField(11)).isEqualTo(Map.of(1, "1")); // upper_bounds
assertThat(dataFile.getField(12)).isNull(); // key_metadata
assertThat(dataFile.getField(13)).isEqualTo(List.of(4L)); // split_offsets
assertThat(dataFile.getField(14)).isEqualTo(List.of(1)); // equality_ids
assertThat(dataFile.getField(15)).isEqualTo(0); // sort_order_id

assertThat(computeScalar("SELECT readable_metrics FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo("""
{\
"dt":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},\
"id":{"column_size":45,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}\
}""");
}
}

@Test
public void testPartitionColumns()
{
Expand Down

0 comments on commit 3efd834

Please sign in to comment.