Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Deletion of Previous Metadata and Statistics Files #312

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d0cd456
delete manifest, manifest list, prev files, stats when drop table wit…
danielhumanmod Sep 22, 2024
26e03ac
unit test for drop table
danielhumanmod Sep 22, 2024
0f8e8f4
refine warning code
danielhumanmod Sep 22, 2024
1b525de
code format
danielhumanmod Sep 22, 2024
e8b26d2
refine warning code
danielhumanmod Sep 22, 2024
2ee6dee
remove unused code
danielhumanmod Sep 22, 2024
806f46d
remove unused import
danielhumanmod Sep 22, 2024
4f1d3c9
code format
danielhumanmod Sep 23, 2024
0a77bfa
remove additional manifest and manifest list deletion
danielhumanmod Sep 24, 2024
47dc60a
add stat deletion test
danielhumanmod Sep 24, 2024
9d835b3
code format
danielhumanmod Sep 24, 2024
40c6147
add new AsyncTaskType
danielhumanmod Oct 6, 2024
f354d1c
Schedule prev metadata and stat files deletion in seperated tasks
danielhumanmod Oct 6, 2024
88c6651
Table content cleanup task handler
danielhumanmod Oct 6, 2024
af3efab
Unit test for table clean up
danielhumanmod Oct 6, 2024
278ab7e
code format
danielhumanmod Oct 6, 2024
ed30fb0
register task handler
danielhumanmod Oct 7, 2024
05c3dd9
handler table content files in batch
danielhumanmod Oct 7, 2024
49dbe68
adjust unit test after batch processing
danielhumanmod Oct 7, 2024
8eea50d
add unit test for TableContentCleanupTaskHandler
danielhumanmod Oct 7, 2024
d9804e6
code format
danielhumanmod Oct 7, 2024
54511de
Merge branch 'main' into pr-289
danielhumanmod Oct 17, 2024
56ba4f2
Merge branch 'main' into pr-289
danielhumanmod Oct 26, 2024
e92852e
Merge branch 'main' into pr-289
danielhumanmod Nov 4, 2024
27ea1b3
merge cleanup tasks into one
danielhumanmod Nov 4, 2024
4d1b68b
Merge remote-tracking branch 'origin/pr-289' into pr-289
danielhumanmod Nov 4, 2024
eb533d7
code format
danielhumanmod Nov 4, 2024
47f760f
Merge branch 'main' into pr-289
flyrain Nov 9, 2024
988e530
refactor manifest cleanup handler based on comments
danielhumanmod Nov 14, 2024
4965d5c
refactor table cleanup handler based on comments
danielhumanmod Nov 14, 2024
5f81483
add TODO
danielhumanmod Nov 14, 2024
097189c
Merge branch 'pr-289' of https://github.com/danielhumanmod/polaris in…
danielhumanmod Nov 14, 2024
651ece0
Merge branch 'main' into pr-289
danielhumanmod Nov 14, 2024
16bb5fe
renaming
danielhumanmod Nov 14, 2024
d276ae6
split the task type in cleanup task handler
danielhumanmod Nov 20, 2024
187b47e
error handling
danielhumanmod Nov 20, 2024
ba5c47c
Merge branch 'main' into pr-289
danielhumanmod Nov 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import org.slf4j.LoggerFactory;

/**
* {@link TaskHandler} responsible for deleting all of the files in a manifest and the manifest
* itself. Since data files may be present in multiple manifests across different snapshots, we
* assume a data file that doesn't exist is missing because it was already deleted by another task.
* {@link TaskHandler} responsible for deleting table files: 1. Manifest files: It contains all the
* files in a manifest and the manifest itself. Since data files may be present in multiple
* manifests across different snapshots, we assume a data file that doesn't exist is missing because
* it was already deleted by another task. 2. Table content files: It contains previous metadata and
* statistics files, which are grouped and deleted in batch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle the same for partition statistics files also (in this PR or as a follow up)
more details: apache/iceberg#9409

Copy link
Author

@danielhumanmod danielhumanmod Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle the same for partition statistics files also (in this PR or as a follow up) more details: apache/iceberg#9409

Thanks for your suggestion! Would it be okay to address this in a separate PR avoid making this one too large? (Left a comment on the code, I can create an issue to track it if necessary)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

works for me.

*/
public class ManifestFileCleanupTaskHandler implements TaskHandler {
Copy link
Author

@danielhumanmod danielhumanmod Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming this task will triger lots of relevent changes. If a rename is needed, we may want to handle it in a separate PR to avoid too much changes (Leave a TODO here)

public static final int MAX_ATTEMPTS = 3;
Expand All @@ -68,58 +70,107 @@ public boolean canHandleTask(TaskEntity task) {
@Override
public boolean handleTask(TaskEntity task) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData());
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) {

// if the file doesn't exist, we assume that another task execution was successful, but failed
// to drop the task entity. Log a warning and return success
if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) {
if (cleanupTask.getManifestFileData() != null) {
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData());
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
} else if (cleanupTask.getContentFileBatch() != null) {
return cleanUpContentFiles(cleanupTask.getContentFileBatch(), authorizedFileIO, tableId);
} else {
LOGGER
.atWarn()
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("tableId", tableId)
.log("Manifest cleanup task scheduled, but manifest file doesn't exist");
.log("Cleanup task scheduled, but input file doesn't exist");
return true;
}
}
}

ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, authorizedFileIO);
List<CompletableFuture<Void>> dataFileDeletes =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE),
false)
.map(
file ->
tryDelete(
tableId, authorizedFileIO, manifestFile, file.path().toString(), null, 1))
.toList();
LOGGER.debug(
"Scheduled {} data files to be deleted from manifest {}",
dataFileDeletes.size(),
manifestFile.path());
try {
// wait for all data files to be deleted, then wait for the manifest itself to be deleted
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
.thenCompose(
(v) -> {
LOGGER
.atInfo()
.addKeyValue("manifestFile", manifestFile.path())
.log("All data files in manifest deleted - deleting manifest");
return tryDelete(
tableId, authorizedFileIO, manifestFile, manifestFile.path(), null, 1);
})
.get();
return true;
} catch (InterruptedException e) {
LOGGER.error(
"Interrupted exception deleting data files from manifest {}", manifestFile.path(), e);
throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e);
return false;
}
private boolean cleanUpManifestFile(
Copy link
Author

@danielhumanmod danielhumanmod Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the lots of changes here, but don’t worry—it’s mainly because I refactored the deletion logic for the manifest and all its data into a new method; no other changes were made in lines 91-135.

ManifestFile manifestFile, FileIO fileIO, TableIdentifier tableId) {
// if the file doesn't exist, we assume that another task execution was successful, but
// failed to drop the task entity. Log a warning and return success
if (!TaskUtils.exists(manifestFile.path(), fileIO)) {
LOGGER
.atWarn()
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("tableId", tableId)
.log("Manifest cleanup task scheduled, but manifest file doesn't exist");
return true;
}

ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, fileIO);
List<CompletableFuture<Void>> dataFileDeletes =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE),
false)
.map(file -> tryDelete(tableId, fileIO, manifestFile, file.path().toString(), null, 1))
.toList();
LOGGER.debug(
"Scheduled {} data files to be deleted from manifest {}",
dataFileDeletes.size(),
manifestFile.path());
try {
// wait for all data files to be deleted, then wait for the manifest itself to be deleted
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
.thenCompose(
(v) -> {
LOGGER
.atInfo()
.addKeyValue("manifestFile", manifestFile.path())
.log("All data files in manifest deleted - deleting manifest");
return tryDelete(tableId, fileIO, manifestFile, manifestFile.path(), null, 1);
})
.get();
return true;
} catch (InterruptedException e) {
LOGGER.error(
"Interrupted exception deleting data files from manifest {}", manifestFile.path(), e);
throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e);
return false;
}
}

private boolean cleanUpContentFiles(
List<String> contentFileBatch, FileIO fileIO, TableIdentifier tableId) {
List<String> validFiles =
contentFileBatch.stream().filter(file -> TaskUtils.exists(file, fileIO)).toList();
if (validFiles.isEmpty()) {
LOGGER
.atWarn()
.addKeyValue("contentFileBatch", contentFileBatch.toString())
.addKeyValue("tableId", tableId)
.log("Table content cleanup task scheduled, but the none of the file in batch exists");
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to warn here if 1 out of 10 files doesn't exist?

}

// Schedule the deletion for each file asynchronously
List<CompletableFuture<Void>> deleteFutures =
validFiles.stream().map(file -> tryDelete(tableId, fileIO, null, file, null, 1)).toList();

// Wait for all delete operations to finish
try {
CompletableFuture<Void> allDeletes =
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
allDeletes.join();
} catch (Exception e) {
LOGGER
.atWarn()
.addKeyValue("contentFileBatch", contentFileBatch.toString())
.addKeyValue("tableId", tableId)
.log("Exception detected during content file batch deletion", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw here?

}

LOGGER
.atInfo()
.addKeyValue("contentFileBatch", contentFileBatch.toString())
.addKeyValue("tableId", tableId)
.log("Content file batch deletion has completed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either remove it or make it debug level. An info level log may not be necessary.


return true;
}

private static ManifestFile decodeManifestData(String manifestFileData) {
Expand All @@ -134,16 +185,16 @@ private CompletableFuture<Void> tryDelete(
TableIdentifier tableId,
FileIO fileIO,
ManifestFile manifestFile,
String dataFile,
String file,
Throwable e,
int attempt) {
if (e != null && attempt <= MAX_ATTEMPTS) {
LOGGER
.atWarn()
.addKeyValue("dataFile", dataFile)
.addKeyValue("file", file)
.addKeyValue("attempt", attempt)
.addKeyValue("error", e.getMessage())
.log("Error encountered attempting to delete data file");
.log("Error encountered attempting to delete file");
}
if (attempt > MAX_ATTEMPTS && e != null) {
return CompletableFuture.failedFuture(e);
Expand All @@ -155,27 +206,27 @@ private CompletableFuture<Void> tryDelete(
// file's existence, but then it is deleted before we have a chance to
// send the delete request. In such a case, we <i>should</i> retry
// and find
if (TaskUtils.exists(dataFile, fileIO)) {
fileIO.deleteFile(dataFile);
if (TaskUtils.exists(file, fileIO)) {
fileIO.deleteFile(file);
} else {
LOGGER
.atInfo()
.addKeyValue("dataFile", dataFile)
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("file", file)
.addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "")
.addKeyValue("tableId", tableId)
.log("Manifest cleanup task scheduled, but data file doesn't exist");
.log("table file cleanup task scheduled, but data file doesn't exist");
}
},
executorService)
.exceptionallyComposeAsync(
newEx -> {
LOGGER
.atWarn()
.addKeyValue("dataFile", dataFile)
.addKeyValue("tableIdentifer", tableId)
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("dataFile", file)
.addKeyValue("tableIdentifier", tableId)
.addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "")
.log("Exception caught deleting data file from manifest", newEx);
return tryDelete(tableId, fileIO, manifestFile, dataFile, newEx, attempt + 1);
return tryDelete(tableId, fileIO, manifestFile, file, newEx, attempt + 1);
},
CompletableFuture.delayedExecutor(
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService));
Expand All @@ -185,12 +236,18 @@ private CompletableFuture<Void> tryDelete(
public static final class ManifestCleanupTask {
private TableIdentifier tableId;
private String manifestFileData;
private List<String> contentFileBatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename here


public ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) {
this.tableId = tableId;
this.manifestFileData = manifestFileData;
}

public ManifestCleanupTask(TableIdentifier tableId, List<String> contentFileBatch) {
this.tableId = tableId;
this.contentFileBatch = contentFileBatch;
}

public ManifestCleanupTask() {}

public TableIdentifier getTableId() {
Expand All @@ -209,17 +266,26 @@ public void setManifestFileData(String manifestFileData) {
this.manifestFileData = manifestFileData;
}

public List<String> getContentFileBatch() {
return contentFileBatch;
}

public void setContentFileBatch(List<String> contentFileBatch) {
this.contentFileBatch = contentFileBatch;
}

@Override
public boolean equals(Object object) {
if (this == object) return true;
if (!(object instanceof ManifestCleanupTask that)) return false;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(manifestFileData, that.manifestFileData);
&& Objects.equals(manifestFileData, that.manifestFileData)
&& Objects.equals(contentFileBatch, that.contentFileBatch);
}

@Override
public int hashCode() {
return Objects.hash(tableId, manifestFileData);
return Objects.hash(tableId, manifestFileData, contentFileBatch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.polaris.service.task;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.io.FileIO;
Expand All @@ -49,6 +52,7 @@ public class TableCleanupTaskHandler implements TaskHandler {
private final TaskExecutor taskExecutor;
private final MetaStoreManagerFactory metaStoreManagerFactory;
private final Function<TaskEntity, FileIO> fileIOSupplier;
private static final int BATCH_SIZE = 10;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tentatively set the BATCH_SIZE as 10, please feel free to let me know if there is a better option

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the PolarisConfigurationStore to control this value


public TableCleanupTaskHandler(
TaskExecutor taskExecutor,
Expand Down Expand Up @@ -102,7 +106,7 @@ public boolean handleTask(TaskEntity cleanupTask) {
// read the manifest list for each snapshot. dedupe the manifest files and schedule a
// cleanupTask
// for each manifest file and its data files to be deleted
List<TaskEntity> taskEntities =
danielhumanmod marked this conversation as resolved.
Show resolved Hide resolved
Stream<TaskEntity> manifestCleanupTasks =
tableMetadata.snapshots().stream()
.flatMap(sn -> sn.allManifests(fileIO).stream())
// distinct by manifest path, since multiple snapshots will contain the same
Expand Down Expand Up @@ -142,8 +146,40 @@ public boolean handleTask(TaskEntity cleanupTask) {
// copy the internal properties, which will have storage info
.setInternalProperties(cleanupTask.getInternalPropertiesAsMap())
.build();
})
.toList();
});

Stream<TaskEntity> contentFileCleanupTasks =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> metadataFileCleanupTask

getContentFileBatch(tableMetadata, fileIO).stream()
.map(
fileBatch -> {
String taskName =
String.join(
"_",
cleanupTask.getName(),
fileBatch.toString(),
UUID.randomUUID().toString());
LOGGER
.atDebug()
.addKeyValue("taskName", taskName)
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("fileBatch", fileBatch)
.log(
"Queueing task to delete content file (prev metadata and statistics files)");
return new TaskEntity.Builder()
.setName(taskName)
.setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId())
.setCreateTimestamp(polarisCallContext.getClock().millis())
.withTaskType(AsyncTaskType.FILE_CLEANUP)
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableEntity.getTableIdentifier(), fileBatch))
.setInternalProperties(cleanupTask.getInternalPropertiesAsMap())
.build();
});

List<TaskEntity> taskEntities =
Stream.concat(manifestCleanupTasks, contentFileCleanupTasks).toList();

List<PolarisBaseEntity> createdTasks =
metaStoreManager
.createEntitiesIfNotExist(polarisCallContext, null, taskEntities)
Expand All @@ -154,15 +190,32 @@ public boolean handleTask(TaskEntity cleanupTask) {
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("metadataLocation", tableEntity.getMetadataLocation())
.addKeyValue("taskCount", taskEntities.size())
.log("Successfully queued tasks to delete manifests - deleting table metadata file");
.log(
"Successfully queued tasks to delete manifests, previous metadata, and statistics files - deleting table metadata file");
for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(), CallContext.getCurrentContext());
}

fileIO.deleteFile(tableEntity.getMetadataLocation());

return true;
}
}
return false;
}

Copy link
Author

@danielhumanmod danielhumanmod Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract previous manifest task creation into a new method, no new change added for line 152 - 201

private List<List<String>> getContentFileBatch(TableMetadata tableMetadata, FileIO fileIO) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name contentFile in Iceberg refers to a data file or a delete file. Let's pick another name to avoid confusion, as metadata files and statistic files are not content files. How about metadataFile?

List<List<String>> result = new ArrayList<>();
List<String> contentFiles =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename here

Stream.concat(
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
.filter(file -> TaskUtils.exists(file, fileIO))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the file existence here? It will be checked at delete time, right?

.toList();

for (int i = 0; i < contentFiles.size(); i += BATCH_SIZE) {
result.add(contentFiles.subList(i, Math.min(i + BATCH_SIZE, contentFiles.size())));
}
return result;
}
}
Loading