diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index a96c7b5c409e..bb7bf9c604f7 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -1720,6 +1720,29 @@ state of the table to a previous snapshot id: ALTER TABLE testdb.customer_orders EXECUTE rollback_to_snapshot(8954597067493422955); ``` +The table procedure `rollback_to_timestamp` allows the caller to roll back the +state of the table to latest snapshot before or at the specified timestamp. + +```sql +ALTER TABLE testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 10:10:10.000') +``` + +Assuming that the session time zone is `America/Los_Angeles` the following queries are equivalent: + +```sql +ALTER TABLE testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01') +``` + +```sql +SELECT * +FROM example.testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 00:00:00'); +``` + +```sql +SELECT * +FROM example.testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 00:00:00.000 America/Los_Angeles'); +``` + #### `NOT NULL` column constraint The Iceberg connector supports setting `NOT NULL` constraints on the table diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index f94341c3b584..9f5b9e61a642 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -1594,6 +1594,7 @@ public Optional getTableHandleForExecute( case OPTIMIZE -> getTableHandleForOptimize(tableHandle, icebergTable, executeProperties, retryMode); case DROP_EXTENDED_STATS -> getTableHandleForDropExtendedStats(session, tableHandle); case ROLLBACK_TO_SNAPSHOT -> getTableHandleForRollbackToSnapshot(session, tableHandle, executeProperties); + case ROLLBACK_TO_TIMESTAMP -> getTableHandleForRollbackToTimestamp(session, tableHandle, executeProperties); case EXPIRE_SNAPSHOTS -> getTableHandleForExpireSnapshots(session, tableHandle, executeProperties); case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties); case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties); @@ -1777,6 +1778,22 @@ private Optional getTableHandleForRollbackToSnapsho icebergTable.io().properties())); } + private Optional getTableHandleForRollbackToTimestamp(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + Instant instant = (Instant) executeProperties.get("snapshot_timestamp"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + + long snapshotId = getSnapshotIdAsOfTime(icebergTable, instant.toEpochMilli()); + + // reuse rollback_to_snapshot + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + ROLLBACK_TO_SNAPSHOT, + new IcebergRollbackToSnapshotHandle(snapshotId), + icebergTable.location(), + icebergTable.io().properties())); + } + private static Object requireProcedureArgument(Map properties, String name) { Object value = properties.get(name); @@ -1793,6 +1810,7 @@ public Optional getLayoutForTableExecute(ConnectorSession return getLayoutForOptimize(session, executeHandle); case DROP_EXTENDED_STATS: case ROLLBACK_TO_SNAPSHOT: + case ROLLBACK_TO_TIMESTAMP: case EXPIRE_SNAPSHOTS: case REMOVE_ORPHAN_FILES: case ADD_FILES: @@ -1823,6 +1841,7 @@ public BeginTableExecuteResult +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + ROLLBACK_TO_TIMESTAMP.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(new PropertyMetadata<>( + "snapshot_timestamp", + "Snapshot timestamp", + TIMESTAMP_TZ_MILLIS, + Instant.class, + null, + false, + value -> ((SqlTimestampWithTimeZone) value).toZonedDateTime().toInstant(), + instant -> SqlTimestampWithTimeZone.fromInstant(3, instant, UTC_KEY.getZoneId()))) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 251affd72387..391ccbe82d87 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -1700,6 +1700,49 @@ private void testRollbackSnapshot(String rollbackToSnapshotFormat) assertUpdate("DROP TABLE test_rollback"); } + @Test + public void testRollbackTimestamp() + { + testRollbackTimestamp("ALTER TABLE tpch.test_rollback_timestamp EXECUTE rollback_to_timestamp(TIMESTAMP '%s')"); + testRollbackTimestamp("ALTER TABLE tpch.test_rollback_timestamp EXECUTE rollback_to_timestamp(snapshot_timestamp => TIMESTAMP '%s')"); + } + + private void testRollbackTimestamp(String rollbackToTimestampForamt) + { + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS VV"); + + assertUpdate("CREATE TABLE test_rollback_timestamp (col0 INTEGER, col1 BIGINT)"); + String afterCreateTableTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter); + + assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (123, CAST(987 AS BIGINT))", 1); + String afterFirstInsertTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter); + assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))"); + + // Check that rollback_to_snapshot can be executed also when it does not do any changes + assertUpdate(format(rollbackToTimestampForamt, afterFirstInsertTimestamp)); + assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))"); + + assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1); + assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))"); + + assertUpdate(format(rollbackToTimestampForamt, afterFirstInsertTimestamp)); + assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))"); + + assertUpdate(format(rollbackToTimestampForamt, afterCreateTableTimestamp)); + assertThat((long) computeActual("SELECT COUNT(*) FROM test_rollback_timestamp").getOnlyValue()).isEqualTo(0); + + assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1); + String afterSecondInsertTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter); + + // extra insert which should be dropped on rollback + assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1); + + assertUpdate(format(rollbackToTimestampForamt, afterSecondInsertTimestamp)); + assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))"); + + assertUpdate("DROP TABLE test_rollback_timestamp"); + } + @Test void testRollbackToSnapshotWithNullArgument() { @@ -8868,6 +8911,11 @@ private long getCurrentSnapshotId(String tableName) return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); } + private ZonedDateTime getCurrentCommitTimestamp(String tableName) + { + return (ZonedDateTime) computeScalar("SELECT committed_at FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); + } + private String getIcebergTableDataPath(String tableLocation) { return tableLocation + "/data"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java index 07ad8b4eb20b..bed338c4ee79 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java @@ -19,6 +19,9 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.sql.Timestamp; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.function.Consumer; @@ -366,6 +369,28 @@ public void testRollbackToSnapshot() onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testRollbackToTimestamp() + throws InterruptedException + { + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS VV"); + + String tableName = "test_rollback_to_timestamp_" + randomNameSuffix(); + + onTrino().executeQuery("USE iceberg.default"); + onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); + onTrino().executeQuery(format("CREATE TABLE %s (a INTEGER)", tableName)); + Thread.sleep(1); + onTrino().executeQuery(format("INSERT INTO %s VALUES 1", tableName)); + String snapshotTimestamp = timeFormatter.format(getCurrentCommitTimestamp(tableName).toInstant().atZone(ZoneId.of("UTC"))); + Thread.sleep(1); + onTrino().executeQuery(format("INSERT INTO %s VALUES 2", tableName)); + onTrino().executeQuery(format("ALTER TABLE %s EXECUTE rollback_to_timestamp(TIMESTAMP '%s')", tableName, snapshotTimestamp)); + assertThat(onTrino().executeQuery(format("SELECT * FROM %s", tableName))) + .containsOnly(row(1)); + onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); + } + private long getSecondOldestTableSnapshot(String tableName) { return (Long) onTrino().executeQuery( @@ -373,6 +398,13 @@ private long getSecondOldestTableSnapshot(String tableName) .getOnlyValue(); } + private Timestamp getCurrentCommitTimestamp(String tableName) + { + return (Timestamp) onTrino().executeQuery( + format("SELECT committed_at FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName)) + .getOnlyValue(); + } + @DataProvider public static Object[][] fileFormats() {