Skip to content

Commit

Permalink
Add rollback_to_timestamp table procedure in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Feb 7, 2025
1 parent b1d2302 commit a68596c
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 0 deletions.
23 changes: 23 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,29 @@ state of the table to a previous snapshot id:
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_snapshot(8954597067493422955);
```

The table procedure `rollback_to_timestamp` allows the caller to roll back the
state of the table to latest snapshot before or at the specified timestamp.

```sql
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 10:10:10.000')
```

Assuming that the session time zone is `America/Los_Angeles` the following queries are equivalent:

```sql
ALTER TABLE testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01')
```

```sql
SELECT *
FROM example.testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 00:00:00');
```

```sql
SELECT *
FROM example.testdb.customer_orders EXECUTE rollback_to_timestamp(TIMESTAMP '2025-01-01 00:00:00.000 America/Los_Angeles');
```

#### `NOT NULL` column constraint

The Iceberg connector supports setting `NOT NULL` constraints on the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,7 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
case OPTIMIZE -> getTableHandleForOptimize(tableHandle, icebergTable, executeProperties, retryMode);
case DROP_EXTENDED_STATS -> getTableHandleForDropExtendedStats(session, tableHandle);
case ROLLBACK_TO_SNAPSHOT -> getTableHandleForRollbackToSnapshot(session, tableHandle, executeProperties);
case ROLLBACK_TO_TIMESTAMP -> getTableHandleForRollbackToTimestamp(session, tableHandle, executeProperties);
case EXPIRE_SNAPSHOTS -> getTableHandleForExpireSnapshots(session, tableHandle, executeProperties);
case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties);
case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties);
Expand Down Expand Up @@ -1777,6 +1778,22 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForRollbackToSnapsho
icebergTable.io().properties()));
}

private Optional<ConnectorTableExecuteHandle> getTableHandleForRollbackToTimestamp(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
Instant instant = (Instant) executeProperties.get("snapshot_timestamp");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

long snapshotId = getSnapshotIdAsOfTime(icebergTable, instant.toEpochMilli());

// reuse rollback_to_snapshot
return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
ROLLBACK_TO_SNAPSHOT,
new IcebergRollbackToSnapshotHandle(snapshotId),
icebergTable.location(),
icebergTable.io().properties()));
}

private static Object requireProcedureArgument(Map<String, Object> properties, String name)
{
Object value = properties.get(name);
Expand All @@ -1793,6 +1810,7 @@ public Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession
return getLayoutForOptimize(session, executeHandle);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1823,6 +1841,7 @@ public BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandle
return beginOptimize(session, executeHandle, table);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down Expand Up @@ -1869,6 +1888,7 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
return;
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTableProcedure;
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotTimestampTableProcedure;
import io.trino.plugin.iceberg.procedure.UnregisterTableProcedure;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
Expand Down Expand Up @@ -132,6 +133,7 @@ public void configure(Binder binder)
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(DropExtendedStatsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RollbackToSnapshotTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RollbackToSnapshotTimestampTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(ExpireSnapshotsTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
pageSorter);
case DROP_EXTENDED_STATS:
case ROLLBACK_TO_SNAPSHOT:
case ROLLBACK_TO_TIMESTAMP:
case EXPIRE_SNAPSHOTS:
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum IcebergTableProcedureId
OPTIMIZE,
DROP_EXTENDED_STATS,
ROLLBACK_TO_SNAPSHOT,
ROLLBACK_TO_TIMESTAMP,
EXPIRE_SNAPSHOTS,
REMOVE_ORPHAN_FILES,
ADD_FILES,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.SqlTimestampWithTimeZone;

import java.time.Instant;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ROLLBACK_TO_TIMESTAMP;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;

public class RollbackToSnapshotTimestampTableProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
ROLLBACK_TO_TIMESTAMP.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(new PropertyMetadata<>(
"snapshot_timestamp",
"Snapshot timestamp",
TIMESTAMP_TZ_MILLIS,
Instant.class,
null,
false,
value -> ((SqlTimestampWithTimeZone) value).toZonedDateTime().toInstant(),
instant -> SqlTimestampWithTimeZone.fromInstant(3, instant, UTC_KEY.getZoneId())))
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,49 @@ private void testRollbackSnapshot(String rollbackToSnapshotFormat)
assertUpdate("DROP TABLE test_rollback");
}

@Test
public void testRollbackTimestamp()
{
testRollbackTimestamp("ALTER TABLE tpch.test_rollback_timestamp EXECUTE rollback_to_timestamp(TIMESTAMP '%s')");
testRollbackTimestamp("ALTER TABLE tpch.test_rollback_timestamp EXECUTE rollback_to_timestamp(snapshot_timestamp => TIMESTAMP '%s')");
}

private void testRollbackTimestamp(String rollbackToTimestampForamt)
{
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS VV");

assertUpdate("CREATE TABLE test_rollback_timestamp (col0 INTEGER, col1 BIGINT)");
String afterCreateTableTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter);

assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (123, CAST(987 AS BIGINT))", 1);
String afterFirstInsertTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter);
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

// Check that rollback_to_snapshot can be executed also when it does not do any changes
assertUpdate(format(rollbackToTimestampForamt, afterFirstInsertTimestamp));
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1);
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))");

assertUpdate(format(rollbackToTimestampForamt, afterFirstInsertTimestamp));
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))");

assertUpdate(format(rollbackToTimestampForamt, afterCreateTableTimestamp));
assertThat((long) computeActual("SELECT COUNT(*) FROM test_rollback_timestamp").getOnlyValue()).isEqualTo(0);

assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1);
String afterSecondInsertTimestamp = getCurrentCommitTimestamp("test_rollback_timestamp").format(timeFormatter);

// extra insert which should be dropped on rollback
assertUpdate("INSERT INTO test_rollback_timestamp (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1);

assertUpdate(format(rollbackToTimestampForamt, afterSecondInsertTimestamp));
assertQuery("SELECT * FROM test_rollback_timestamp ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))");

assertUpdate("DROP TABLE test_rollback_timestamp");
}

@Test
void testRollbackToSnapshotWithNullArgument()
{
Expand Down Expand Up @@ -8868,6 +8911,11 @@ private long getCurrentSnapshotId(String tableName)
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
}

private ZonedDateTime getCurrentCommitTimestamp(String tableName)
{
return (ZonedDateTime) computeScalar("SELECT committed_at FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES");
}

private String getIcebergTableDataPath(String tableLocation)
{
return tableLocation + "/data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.function.Consumer;

Expand Down Expand Up @@ -366,13 +369,42 @@ public void testRollbackToSnapshot()
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testRollbackToTimestamp()
throws InterruptedException
{
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS VV");

String tableName = "test_rollback_to_timestamp_" + randomNameSuffix();

onTrino().executeQuery("USE iceberg.default");
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
onTrino().executeQuery(format("CREATE TABLE %s (a INTEGER)", tableName));
Thread.sleep(1);
onTrino().executeQuery(format("INSERT INTO %s VALUES 1", tableName));
String snapshotTimestamp = timeFormatter.format(getCurrentCommitTimestamp(tableName).toInstant().atZone(ZoneId.of("UTC")));
Thread.sleep(1);
onTrino().executeQuery(format("INSERT INTO %s VALUES 2", tableName));
onTrino().executeQuery(format("ALTER TABLE %s EXECUTE rollback_to_timestamp(TIMESTAMP '%s')", tableName, snapshotTimestamp));
assertThat(onTrino().executeQuery(format("SELECT * FROM %s", tableName)))
.containsOnly(row(1));
onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName));
}

private long getSecondOldestTableSnapshot(String tableName)
{
return (Long) onTrino().executeQuery(
format("SELECT snapshot_id FROM iceberg.default.\"%s$snapshots\" WHERE parent_id IS NOT NULL ORDER BY committed_at FETCH FIRST 1 ROW WITH TIES", tableName))
.getOnlyValue();
}

private Timestamp getCurrentCommitTimestamp(String tableName)
{
return (Timestamp) onTrino().executeQuery(
format("SELECT committed_at FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName))
.getOnlyValue();
}

@DataProvider
public static Object[][] fileFormats()
{
Expand Down

0 comments on commit a68596c

Please sign in to comment.