diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputeTableStatsProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputeTableStatsProcedure.java new file mode 100644 index 000000000000..1597c47bd5d3 --- /dev/null +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputeTableStatsProcedure.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.BlobMetadata; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.actions.NDVSketchUtil; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.junit.After; +import org.junit.Test; + +public class TestComputeTableStatsProcedure extends SparkExtensionsTestBase { + + public TestComputeTableStatsProcedure( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testProcedureOnEmptyTable() throws NoSuchTableException, ParseException { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + List result = + sql("CALL %s.system.compute_table_stats('%s')", catalogName, tableIdent); + assertThat(result).isEmpty(); + } + + @Test + public void testProcedureWithNamedArgs() throws NoSuchTableException, ParseException { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName); + List output = + sql( + "CALL %s.system.compute_table_stats(table => '%s', columns => array('id'))", + catalogName, tableIdent); + assertThat(output.get(0)).isNotEmpty(); + Object obj = output.get(0)[0]; + assertThat(obj.toString()).endsWith(".stats"); + verifyTableStats(tableName); + } + + @Test + public void testProcedureWithPositionalArgs() throws NoSuchTableException, ParseException { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + Snapshot snapshot = table.currentSnapshot(); + List output = + sql( + "CALL %s.system.compute_table_stats('%s', %dL)", + catalogName, tableIdent, snapshot.snapshotId()); + assertThat(output.get(0)).isNotEmpty(); + Object obj = output.get(0)[0]; + assertThat(obj.toString()).endsWith(".stats"); + verifyTableStats(tableName); + } + + @Test + public void testProcedureWithInvalidColumns() { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName); + assertThatThrownBy( + () -> + sql( + "CALL %s.system.compute_table_stats(table => '%s', columns => array('id1'))", + catalogName, tableIdent)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Can't find column id1"); + } + + @Test + public void testProcedureWithInvalidSnapshot() { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + assertThatThrownBy( + () -> + sql( + "CALL %s.system.compute_table_stats(table => '%s', snapshot_id => %dL)", + catalogName, tableIdent, 1234L)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Snapshot not found"); + } + + @Test + public void testProcedureWithInvalidTable() { + assertThatThrownBy( + () -> + sql( + "CALL %s.system.compute_table_stats(table => '%s', snapshot_id => %dL)", + catalogName, TableIdentifier.of(Namespace.of("default"), "abcd"), 1234L)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Couldn't load table"); + } + + void verifyTableStats(String tableName) throws NoSuchTableException, ParseException { + Table table = Spark3Util.loadIcebergTable(spark, tableName); + StatisticsFile statisticsFile = table.statisticsFiles().get(0); + BlobMetadata blobMetadata = statisticsFile.blobMetadata().get(0); + assertThat(blobMetadata.properties()) + .containsKey(NDVSketchUtil.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputeTableStatsProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputeTableStatsProcedure.java new file mode 100644 index 000000000000..1c2d7125a38a --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputeTableStatsProcedure.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ComputeTableStats; +import org.apache.iceberg.actions.ComputeTableStats.Result; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A procedure that computes statistics of a table. + * + * @see SparkActions#computeTableStats(Table) + */ +public class ComputeTableStatsProcedure extends BaseProcedure { + + private static final ProcedureParameter TABLE_PARAM = + ProcedureParameter.required("table", DataTypes.StringType); + private static final ProcedureParameter SNAPSHOT_ID_PARAM = + ProcedureParameter.optional("snapshot_id", DataTypes.LongType); + private static final ProcedureParameter COLUMNS_PARAM = + ProcedureParameter.optional("columns", STRING_ARRAY); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM, COLUMNS_PARAM}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("statistics_file", DataTypes.StringType, true, Metadata.empty()) + }); + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + protected ComputeTableStatsProcedure doBuild() { + return new ComputeTableStatsProcedure(tableCatalog()); + } + }; + } + + private ComputeTableStatsProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + Identifier tableIdent = input.ident(TABLE_PARAM); + Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null); + String[] columns = input.asStringArray(COLUMNS_PARAM, null); + + return modifyIcebergTable( + tableIdent, + table -> { + ComputeTableStats action = actions().computeTableStats(table); + + if (snapshotId != null) { + action.snapshot(snapshotId); + } + + if (columns != null) { + action.columns(columns); + } + + Result result = action.execute(); + return toOutputRows(result); + }); + } + + private InternalRow[] toOutputRows(Result result) { + StatisticsFile statisticsFile = result.statisticsFile(); + if (statisticsFile != null) { + InternalRow row = newInternalRow(UTF8String.fromString(statisticsFile.path())); + return new InternalRow[] {row}; + } else { + return new InternalRow[0]; + } + } + + @Override + public String description() { + return "ComputeTableStatsProcedure"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 42003b24e94c..d636a21ddc00 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -61,6 +61,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder); mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder); mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder); + mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder); return mapBuilder.build(); }