Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DELTA] Trino concurrent CTAS query failure deletes Data Directory, rendering succeeded one in un-queryable state #24153

Open
vinay-kl opened this issue Nov 17, 2024 · 2 comments
Labels
bug Something isn't working delta-lake Delta Lake connector

Comments

@vinay-kl
Copy link
Contributor

Hello Team, We are using v454 in prod, facing the following issue

Trino concurrent CTAS with same name on delta-table, which in-particular has failed, cleans up data-directory as part of rollback, the succeeded one is rendered useless as it's not queryable and drop is also not possible on Trino.

Reproduction Steps

trino> create table delta.dev.delete_trino_test_stg1 as SELECT a, b, 20220101 as d FROM UNNEST(SEQUENCE(1, 9001), SEQUENCE(1, 9001)) AS t(a, b);
CREATE TABLE: 9001 rows

trino> create table delta.dev.delete_trino_test_stg2 as SELECT a, b, 20220101 as d FROM UNNEST(SEQUENCE(1, 9001), SEQUENCE(1, 9001)) AS t(a, b);
CREATE TABLE: 9001 rows

trino> create table delta.dev.sample1 as ((select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d));
CREATE TABLE: 243054003 rows

trino> create table delta.dev.sample1 as ((select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d) union all (select stg1.a as a, stg1.b as b, stg1.d as d from dev.delete_trino_test_stg1 stg1, dev.delete_trino_test_stg2 stg2 where stg1.d=stg2.d));

Query 20241117_112931_00025_me7j4, FAILED, 1 node

Query 20241117_112931_00025_me7j4 failed: Failed to write Delta Lake transaction log entry
io.trino.spi.TrinoException: Failed to write Delta Lake transaction log entry
	at io.trino.plugin.deltalake.DeltaLakeMetadata.finishCreateTable(DeltaLakeMetadata.java:1763)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishCreateTable(ClassLoaderSafeConnectorMetadata.java:578)
	at io.trino.tracing.TracingConnectorMetadata.finishCreateTable(TracingConnectorMetadata.java:659)
	at io.trino.metadata.MetadataManager.finishCreateTable(MetadataManager.java:1190)
	at io.trino.tracing.TracingMetadata.finishCreateTable(TracingMetadata.java:629)
	at io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4104)
	at io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)
	at io.trino.operator.Driver.processInternal(Driver.java:403)
	at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
	at io.trino.operator.Driver.tryWithLock(Driver.java:709)
	at io.trino.operator.Driver.process(Driver.java:298)
	at io.trino.operator.Driver.processForDuration(Driver.java:269)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
	at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:77)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
	at io.trino.$gen.Trino_dev____20241117_111225_2.run(Unknown Source)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
	at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:168)
	at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:155)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.UncheckedIOException: java.nio.file.FileAlreadyExistsException: /tmp/dev/sample1/_delta_log/00000000000000000000.json
	at io.trino.plugin.deltalake.transactionlog.writer.NoIsolationSynchronizer.write(NoIsolationSynchronizer.java:52)
	at io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter.flush(TransactionLogWriter.java:110)
	at io.trino.plugin.deltalake.DeltaLakeMetadata.finishCreateTable(DeltaLakeMetadata.java:1705)
	... 25 more
Caused by: java.nio.file.FileAlreadyExistsException: /tmp/dev/sample1/_delta_log/00000000000000000000.json
	at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:104)
	at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:62)
	at io.trino.filesystem.TrinoOutputFile.create(TrinoOutputFile.java:33)
	at io.trino.filesystem.tracing.TracingOutputFile.lambda$create$0(TracingOutputFile.java:47)
	at io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:47)
	at io.trino.filesystem.tracing.TracingOutputFile.create(TracingOutputFile.java:47)
	at io.trino.plugin.deltalake.transactionlog.writer.NoIsolationSynchronizer.write(NoIsolationSynchronizer.java:47)
	... 27 more
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: /tmp/dev/sample1/_delta_log/00000000000000000000.json
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:557)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:595)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:642)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:730)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:709)
	at io.trino.hdfs.TrinoFileSystemCache$FileSystemWrapper.create(TrinoFileSystemCache.java:364)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1229)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1206)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1087)
	at io.trino.filesystem.hdfs.HdfsOutputFile.lambda$create$1(HdfsOutputFile.java:100)
	at io.trino.hdfs.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:27)
	at io.trino.hdfs.HdfsEnvironment.doAs(HdfsEnvironment.java:134)
	at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:115)
	at io.trino.filesystem.hdfs.HdfsOutputFile.create(HdfsOutputFile.java:100)
	... 33 more

Rollback Stack-trace

java.base/java.lang.Thread.getStackTrace(Thread.java:2418)
io.trino.filesystem.hdfs.HdfsFileSystem.deleteDirectory(HdfsFileSystem.java:164)
io.trino.filesystem.switching.SwitchingFileSystem.deleteDirectory(SwitchingFileSystem.java:92)
io.trino.filesystem.tracing.TracingFileSystem.lambda$deleteDirectory$2(TracingFileSystem.java:89)
io.trino.filesystem.tracing.Tracing.lambda$withTracing$1(Tracing.java:38)
io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:47)
io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:37)
io.trino.filesystem.tracing.TracingFileSystem.deleteDirectory(TracingFileSystem.java:89)
io.trino.plugin.deltalake.DeltaLakeMetadata.deleteRecursivelyIfExists(DeltaLakeMetadata.java:1608)
io.trino.plugin.deltalake.DeltaLakeMetadata.lambda$beginCreateTable$39(DeltaLakeMetadata.java:1496)
java.base/java.util.Optional.ifPresent(Optional.java:178)
io.trino.plugin.deltalake.DeltaLakeMetadata.rollback(DeltaLakeMetadata.java:3345)
io.trino.plugin.deltalake.DeltaLakeTransactionManager.lambda$rollback$1(DeltaLakeTransactionManager.java:69)
java.base/java.util.Optional.ifPresent(Optional.java:178)
io.trino.plugin.deltalake.DeltaLakeTransactionManager.rollback(DeltaLakeTransactionManager.java:67)
io.trino.plugin.deltalake.DeltaLakeConnector.rollback(DeltaLakeConnector.java:215)
io.trino.metadata.CatalogTransaction.abort(CatalogTransaction.java:93)
io.trino.metadata.CatalogMetadata.safeAbort(CatalogMetadata.java:166)
io.trino.metadata.CatalogMetadata.abort(CatalogMetadata.java:160)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:79)
io.trino.$gen.Trino_dev____20241117_122247_2.run(Unknown Source)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
java.base/java.lang.Thread.run(Thread.java:1570)
@vinay-kl
Copy link
Contributor Author

@findinpath For the above particular case, can we catch the exception and propagate the same to not carry out roll-back of Table-Base location directory deletion?

Also

catch (Exception e) {
// Remove the transaction log entry if the table creation fails
if (!writeCommitted) {
// TODO perhaps it should happen in a background thread (https://github.com/trinodb/trino/issues/12011)
cleanupFailedWrite(session, handle.location(), dataFileInfos);
}
if (handle.readVersion().isEmpty()) {
Location transactionLogDir = Location.of(getTransactionLogDir(location));
try {
fileSystemFactory.create(session).deleteDirectory(transactionLogDir);
}
catch (IOException ioException) {
// Nothing to do, the IOException is probably the same reason why the initial write failed
LOG.error(ioException, "Transaction log cleanup failed during CREATE TABLE rollback");
}
}
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
}
return Optional.empty();
, _delta_log directory also getting deleted which needs to be handled as well.

@vinay-kl vinay-kl added delta-lake Delta Lake connector bug Something isn't working labels Nov 17, 2024
@findinpath
Copy link
Contributor

findinpath commented Nov 18, 2024

I see FileAlreadyExistsException in the stacktrace of the issue you've reported

catch (Exception e) {
// Remove the transaction log entry if the table creation fails
if (!writeCommitted) {
// TODO perhaps it should happen in a background thread (https://github.com/trinodb/trino/issues/12011)
cleanupFailedWrite(session, handle.location(), dataFileInfos);
}
if (handle.readVersion().isEmpty()) {
Location transactionLogDir = Location.of(getTransactionLogDir(location));
try {
fileSystemFactory.create(session).deleteDirectory(transactionLogDir);
}
catch (IOException ioException) {
// Nothing to do, the IOException is probably the same reason why the initial write failed
LOG.error(ioException, "Transaction log cleanup failed during CREATE TABLE rollback");
}
}
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
}

We should likely check (same as we already do in io.trino.plugin.deltalake.transactionlog.writer.S3ConditionalWriteLogSynchronizer ) for this kind of failure. Maybe even more specifically around

transactionLogWriter.flush();

We should do this check in order to avoid calling

fileSystemFactory.create(session).deleteDirectory(transactionLogDir);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working delta-lake Delta Lake connector
Development

No branches or pull requests

2 participants