From 3efd8349ab18f9084b8d79e0dc0e1458dacca44c Mon Sep 17 00:00:00 2001 From: chenjian2664 Date: Thu, 23 Jan 2025 22:25:25 +0800 Subject: [PATCH] Fix error when building entries system table after position deletion --- .../io/trino/plugin/iceberg/EntriesTable.java | 107 +++++++++++++-- .../iceberg/BaseIcebergSystemTables.java | 127 ++++++++++++++++++ 2 files changed, 223 insertions(+), 11 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/EntriesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/EntriesTable.java index 9c0560800826..f900bfb799fb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/EntriesTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/EntriesTable.java @@ -61,6 +61,8 @@ import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES; import static org.apache.iceberg.MetadataTableType.ENTRIES; @@ -202,13 +204,26 @@ private void appendDataFile(RowBlockBuilder blockBuilder, StructProjection dataF Map nanValueCounts = dataFile.get(++position, Map.class); appendIntegerBigintMap((MapBlockBuilder) fieldBuilders.get(position), nanValueCounts); - //noinspection unchecked - Map lowerBounds = dataFile.get(++position, Map.class); - appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), lowerBounds); + switch (ContentType.of(content)) { + case DATA, EQUALITY_DELETE -> { + //noinspection unchecked + Map lowerBounds = dataFile.get(++position, Map.class); + appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), lowerBounds); - //noinspection unchecked - Map upperBounds = dataFile.get(++position, Map.class); - appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), upperBounds); + //noinspection unchecked + Map upperBounds = dataFile.get(++position, Map.class); + appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), upperBounds); + } + case POSITION_DELETE -> { + //noinspection unchecked + Map lowerBounds = dataFile.get(++position, Map.class); + appendBoundsForPositionDelete((MapBlockBuilder) fieldBuilders.get(position), lowerBounds); + + //noinspection unchecked + Map upperBounds = dataFile.get(++position, Map.class); + appendBoundsForPositionDelete((MapBlockBuilder) fieldBuilders.get(position), upperBounds); + } + } ByteBuffer keyMetadata = dataFile.get(++position, ByteBuffer.class); if (keyMetadata == null) { @@ -222,12 +237,30 @@ private void appendDataFile(RowBlockBuilder blockBuilder, StructProjection dataF List splitOffsets = dataFile.get(++position, List.class); appendBigintArray((ArrayBlockBuilder) fieldBuilders.get(position), splitOffsets); - //noinspection unchecked - List equalityIds = dataFile.get(++position, List.class); - appendBigintArray((ArrayBlockBuilder) fieldBuilders.get(position), equalityIds); + switch (ContentType.of(content)) { + case DATA -> { + // data files don't have equality ids + fieldBuilders.get(++position).appendNull(); - Integer sortOrderId = dataFile.get(++position, Integer.class); - INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId)); + Integer sortOrderId = dataFile.get(++position, Integer.class); + INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId)); + } + case POSITION_DELETE -> { + // position delete files don't have equality ids + fieldBuilders.get(++position).appendNull(); + + // position delete files don't have sort order id + fieldBuilders.get(++position).appendNull(); + } + case EQUALITY_DELETE -> { + //noinspection unchecked + List equalityIds = dataFile.get(++position, List.class); + appendIntegerArray((ArrayBlockBuilder) fieldBuilders.get(position), equalityIds); + + Integer sortOrderId = dataFile.get(++position, Integer.class); + INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId)); + } + } }); } @@ -244,6 +277,19 @@ public static void appendBigintArray(ArrayBlockBuilder blockBuilder, @Nullable L }); } + public static void appendIntegerArray(ArrayBlockBuilder blockBuilder, @Nullable List values) + { + if (values == null) { + blockBuilder.appendNull(); + return; + } + blockBuilder.buildEntry(elementBuilder -> { + for (Integer value : values) { + INTEGER.writeLong(elementBuilder, value); + } + }); + } + private static void appendIntegerBigintMap(MapBlockBuilder blockBuilder, @Nullable Map values) { if (values == null) { @@ -268,4 +314,43 @@ private void appendIntegerVarcharMap(MapBlockBuilder blockBuilder, @Nullable Map VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(type, Conversions.fromByteBuffer(type, value))); })); } + + private static void appendBoundsForPositionDelete(MapBlockBuilder blockBuilder, @Nullable Map values) + { + if (values == null) { + blockBuilder.appendNull(); + return; + } + + blockBuilder.buildEntry((keyBuilder, valueBuilder) -> { + INTEGER.writeLong(keyBuilder, DELETE_FILE_POS.fieldId()); + ByteBuffer pos = values.get(DELETE_FILE_POS.fieldId()); + checkArgument(pos != null, "delete file pos is null"); + VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(Types.LongType.get(), Conversions.fromByteBuffer(Types.LongType.get(), pos))); + + INTEGER.writeLong(keyBuilder, DELETE_FILE_PATH.fieldId()); + ByteBuffer path = values.get(DELETE_FILE_PATH.fieldId()); + checkArgument(path != null, "delete file path is null"); + VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(Types.StringType.get(), Conversions.fromByteBuffer(Types.StringType.get(), path))); + }); + } + + private enum ContentType + { + DATA, + POSITION_DELETE, + EQUALITY_DELETE; + + static ContentType of(int content) + { + checkArgument(content >= 0 && content <= 2, "Unexpected content type: %s", content); + if (content == 0) { + return DATA; + } + if (content == 1) { + return POSITION_DELETE; + } + return EQUALITY_DELETE; + } + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index 8f0d77d8820c..001d49e0b331 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -45,11 +46,14 @@ import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; +import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION; import static io.trino.testing.MaterializedResult.resultBuilder; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @@ -639,6 +643,129 @@ void testEntriesTable() } } + @Test + void testEntriesAfterPositionDelete() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) { + assertUpdate("DELETE FROM " + table.getName() + " WHERE id = 1", 1); + + Table icebergTable = loadTable(table.getName()); + Snapshot snapshot = icebergTable.currentSnapshot(); + long snapshotId = snapshot.snapshotId(); + long sequenceNumber = snapshot.sequenceNumber(); + + assertThat(computeScalar("SELECT status FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(1); + assertThat(computeScalar("SELECT snapshot_id FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(snapshotId); + assertThat(computeScalar("SELECT sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(sequenceNumber); + assertThat(computeScalar("SELECT file_sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(2L); + + MaterializedRow deleteFile = (MaterializedRow) computeScalar("SELECT data_file FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId); + assertThat(deleteFile.getFieldCount()).isEqualTo(16); + assertThat(deleteFile.getField(0)).isEqualTo(1); // content + assertThat((String) deleteFile.getField(1)).endsWith(format.toString().toLowerCase(ENGLISH)); // file_path + assertThat(deleteFile.getField(2)).isEqualTo(format.toString()); // file_format + assertThat(deleteFile.getField(3)).isEqualTo(0); // spec_id + assertThat(deleteFile.getField(4)).isEqualTo(1L); // record_count + assertThat((long) deleteFile.getField(5)).isPositive(); // file_size_in_bytes + + //noinspection unchecked + Map columnSizes = (Map) deleteFile.getField(6); + switch (format) { + case ORC -> assertThat(columnSizes).isNull(); + case PARQUET -> assertThat(columnSizes) + .hasSize(2) + .satisfies(_ -> assertThat(columnSizes.get(DELETE_FILE_POS.fieldId())).isPositive()) + .satisfies(_ -> assertThat(columnSizes.get(DELETE_FILE_PATH.fieldId())).isPositive()); + default -> throw new IllegalArgumentException("Unsupported format: " + format); + } + + assertThat(deleteFile.getField(7)).isEqualTo(Map.of(DELETE_FILE_POS.fieldId(), 1L, DELETE_FILE_PATH.fieldId(), 1L)); // value_counts + assertThat(deleteFile.getField(8)).isEqualTo(Map.of(DELETE_FILE_POS.fieldId(), 0L, DELETE_FILE_PATH.fieldId(), 0L)); // null_value_counts + assertThat(deleteFile.getField(9)).isEqualTo(value(Map.of(), null)); // nan_value_counts + + // lower_bounds + //noinspection unchecked + Map lowerBounds = (Map) deleteFile.getField(10); + assertThat(lowerBounds) + .hasSize(2) + .satisfies(_ -> assertThat(lowerBounds.get(DELETE_FILE_POS.fieldId())).isEqualTo("0")) + .satisfies(_ -> assertThat(lowerBounds.get(DELETE_FILE_PATH.fieldId())).contains(table.getName())); + + // upper_bounds + //noinspection unchecked + Map upperBounds = (Map) deleteFile.getField(11); + assertThat(lowerBounds) + .hasSize(2) + .satisfies(_ -> assertThat(upperBounds.get(DELETE_FILE_POS.fieldId())).isEqualTo("0")) + .satisfies(_ -> assertThat(upperBounds.get(DELETE_FILE_PATH.fieldId())).contains(table.getName())); + + assertThat(deleteFile.getField(12)).isNull(); // key_metadata + assertThat(deleteFile.getField(13)).isEqualTo(List.of(value(4L, 3L))); // split_offsets + assertThat(deleteFile.getField(14)).isNull(); // equality_ids + assertThat(deleteFile.getField(15)).isNull(); // sort_order_id + + assertThat(computeScalar("SELECT readable_metrics FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(""" + {\ + "dt":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},\ + "id":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}\ + }"""); + } + } + + @Test + void testEntriesAfterEqualityDelete() + throws Exception + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) { + Table icebergTable = loadTable(table.getName()); + assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0"); + writeEqualityDeleteForTable(icebergTable, fileSystemFactory, Optional.empty(), Optional.empty(), ImmutableMap.of("id", 1), Optional.empty()); + assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "1"); + + Snapshot snapshot = icebergTable.currentSnapshot(); + long snapshotId = snapshot.snapshotId(); + long sequenceNumber = snapshot.sequenceNumber(); + + assertThat(computeScalar("SELECT status FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(1); + assertThat(computeScalar("SELECT snapshot_id FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(snapshotId); + assertThat(computeScalar("SELECT sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(sequenceNumber); + assertThat(computeScalar("SELECT file_sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(2L); + + MaterializedRow dataFile = (MaterializedRow) computeScalar("SELECT data_file FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId); + assertThat(dataFile.getFieldCount()).isEqualTo(16); + assertThat(dataFile.getField(0)).isEqualTo(2); // content + assertThat(dataFile.getField(3)).isEqualTo(0); // spec_id + assertThat(dataFile.getField(4)).isEqualTo(1L); // record_count + assertThat((long) dataFile.getField(5)).isPositive(); // file_size_in_bytes + assertThat(dataFile.getField(6)).isEqualTo(Map.of(1, 45L)); // column_sizes + assertThat(dataFile.getField(7)).isEqualTo(Map.of(1, 1L)); // value_counts + assertThat(dataFile.getField(8)).isEqualTo(Map.of(1, 0L)); // null_value_counts + assertThat(dataFile.getField(9)).isEqualTo(Map.of()); // nan_value_counts + assertThat(dataFile.getField(10)).isEqualTo(Map.of(1, "1")); // lower_bounds + assertThat(dataFile.getField(11)).isEqualTo(Map.of(1, "1")); // upper_bounds + assertThat(dataFile.getField(12)).isNull(); // key_metadata + assertThat(dataFile.getField(13)).isEqualTo(List.of(4L)); // split_offsets + assertThat(dataFile.getField(14)).isEqualTo(List.of(1)); // equality_ids + assertThat(dataFile.getField(15)).isEqualTo(0); // sort_order_id + + assertThat(computeScalar("SELECT readable_metrics FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId)) + .isEqualTo(""" + {\ + "dt":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},\ + "id":{"column_size":45,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}\ + }"""); + } + } + @Test public void testPartitionColumns() {