Skip to content

Commit

Permalink
Spark: Test metadata tables with format-version=3 (apache#12135)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Feb 4, 2025
1 parent da20029 commit e406e3d
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.collections.ListUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
Expand All @@ -44,8 +49,11 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
Expand All @@ -62,6 +70,56 @@

@ExtendWith(ParameterizedTestExtension.class)
public class TestMetadataTables extends ExtensionsTestBase {
@Parameter(index = 3)
private int formatVersion;

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}")
protected static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
SparkCatalogConfig.HIVE.properties(),
2
},
{
SparkCatalogConfig.HADOOP.catalogName(),
SparkCatalogConfig.HADOOP.implementation(),
SparkCatalogConfig.HADOOP.properties(),
2
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
2
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
3
},
{
SparkCatalogConfig.REST.catalogName(),
SparkCatalogConfig.REST.implementation(),
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
.build(),
2
},
{
SparkCatalogConfig.REST.catalogName(),
SparkCatalogConfig.REST.implementation(),
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
.build(),
3
}
};
}

@AfterEach
public void removeTables() {
Expand All @@ -72,8 +130,8 @@ public void removeTables() {
public void testUnpartitionedTable() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);
+ "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
tableName, formatVersion);

List<SimpleRecord> records =
Lists.newArrayList(
Expand Down Expand Up @@ -142,15 +200,75 @@ public void testUnpartitionedTable() throws Exception {
TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), actualFiles.get(1));
}

@TestTemplate
public void testPositionDeletesTable() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
tableName, formatVersion);

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c"),
new SimpleRecord(4, "d"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(tableName)
.append();

sql("DELETE FROM %s WHERE id=1 OR id=3", tableName);

// check delete files table
assertThat(sql("SELECT * FROM %s.delete_files", tableName)).hasSize(1);
Table table = Spark3Util.loadIcebergTable(spark, tableName);
DataFile dataFile = Iterables.getOnlyElement(TestHelpers.dataFiles(table));
DeleteFile deleteFile = Iterables.getOnlyElement(TestHelpers.deleteFiles(table));

List<Object[]> expectedRows;
if (formatVersion >= 3) {
expectedRows =
ImmutableList.of(
row(
dataFile.location(),
0L,
null,
dataFile.specId(),
deleteFile.location(),
deleteFile.contentOffset(),
deleteFile.contentSizeInBytes()),
row(
dataFile.location(),
2L,
null,
dataFile.specId(),
deleteFile.location(),
deleteFile.contentOffset(),
deleteFile.contentSizeInBytes()));
} else {
expectedRows =
ImmutableList.of(
row(dataFile.location(), 0L, null, dataFile.specId(), deleteFile.location()),
row(dataFile.location(), 2L, null, dataFile.specId(), deleteFile.location()));
}

// check position_deletes table
assertThat(sql("SELECT * FROM %s.position_deletes", tableName))
.hasSize(2)
.containsExactlyElementsOf(expectedRows);
}

@TestTemplate
public void testPartitionedTable() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) "
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);
+ "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
tableName, formatVersion);

List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
Expand Down Expand Up @@ -241,8 +359,8 @@ public void testPartitionedTable() throws Exception {
public void testAllFilesUnpartitioned() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);
+ "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
tableName, formatVersion);

List<SimpleRecord> records =
Lists.newArrayList(
Expand Down Expand Up @@ -319,8 +437,8 @@ public void testAllFilesPartitioned() throws Exception {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);
+ "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
tableName, formatVersion);

List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
Expand Down Expand Up @@ -409,8 +527,8 @@ public void testMetadataLogEntries() throws Exception {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES "
+ "('format-version'='2')",
tableName);
+ "('format-version'='%s')",
tableName, formatVersion);

List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
Expand Down Expand Up @@ -498,8 +616,8 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);
+ "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
tableName, formatVersion);

List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
Expand Down Expand Up @@ -561,8 +679,8 @@ public void testSnapshotReferencesMetatable() throws Exception {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);
+ "('format-version'='%s', 'write.delete.mode'='merge-on-read')",
tableName, formatVersion);

List<SimpleRecord> recordsA =
Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
Expand Down Expand Up @@ -753,8 +871,8 @@ public void metadataLogEntriesAfterReplacingTable() throws Exception {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES "
+ "('format-version'='2')",
tableName);
+ "('format-version'='%s')",
tableName, formatVersion);

Table table = Spark3Util.loadIcebergTable(spark, tableName);
TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
Expand Down Expand Up @@ -813,8 +931,8 @@ public void metadataLogEntriesAfterReplacingTable() throws Exception {
+ "USING iceberg "
+ "PARTITIONED BY (data) "
+ "TBLPROPERTIES "
+ "('format-version'='2')",
tableName);
+ "('format-version'='%s')",
tableName, formatVersion);

tableMetadata = ((HasTableOperations) table).operations().refresh();
assertThat(tableMetadata.snapshots()).hasSize(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public InternalRow next() {
rowValues.add(deleteFile.contentOffset());
} else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID) {
rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile));
} else if (fieldId == MetadataColumns.DELETE_FILE_ROW_FIELD_ID) {
// DVs don't track the row that was deleted
rowValues.add(null);
}
}

Expand Down

0 comments on commit e406e3d

Please sign in to comment.