-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async Deletion of Previous Metadata and Statistics Files #312
Open
danielhumanmod
wants to merge
37
commits into
apache:main
Choose a base branch
from
danielhumanmod:pr-289
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
d0cd456
delete manifest, manifest list, prev files, stats when drop table wit…
danielhumanmod 26e03ac
unit test for drop table
danielhumanmod 0f8e8f4
refine warning code
danielhumanmod 1b525de
code format
danielhumanmod e8b26d2
refine warning code
danielhumanmod 2ee6dee
remove unused code
danielhumanmod 806f46d
remove unused import
danielhumanmod 4f1d3c9
code format
danielhumanmod 0a77bfa
remove additional manifest and manifest list deletion
danielhumanmod 47dc60a
add stat deletion test
danielhumanmod 9d835b3
code format
danielhumanmod 40c6147
add new AsyncTaskType
danielhumanmod f354d1c
Schedule prev metadata and stat files deletion in seperated tasks
danielhumanmod 88c6651
Table content cleanup task handler
danielhumanmod af3efab
Unit test for table clean up
danielhumanmod 278ab7e
code format
danielhumanmod ed30fb0
register task handler
danielhumanmod 05c3dd9
handler table content files in batch
danielhumanmod 49dbe68
adjust unit test after batch processing
danielhumanmod 8eea50d
add unit test for TableContentCleanupTaskHandler
danielhumanmod d9804e6
code format
danielhumanmod 54511de
Merge branch 'main' into pr-289
danielhumanmod 56ba4f2
Merge branch 'main' into pr-289
danielhumanmod e92852e
Merge branch 'main' into pr-289
danielhumanmod 27ea1b3
merge cleanup tasks into one
danielhumanmod 4d1b68b
Merge remote-tracking branch 'origin/pr-289' into pr-289
danielhumanmod eb533d7
code format
danielhumanmod 47f760f
Merge branch 'main' into pr-289
flyrain 988e530
refactor manifest cleanup handler based on comments
danielhumanmod 4965d5c
refactor table cleanup handler based on comments
danielhumanmod 5f81483
add TODO
danielhumanmod 097189c
Merge branch 'pr-289' of https://github.com/danielhumanmod/polaris in…
danielhumanmod 651ece0
Merge branch 'main' into pr-289
danielhumanmod 16bb5fe
renaming
danielhumanmod d276ae6
split the task type in cleanup task handler
danielhumanmod 187b47e
error handling
danielhumanmod ba5c47c
Merge branch 'main' into pr-289
danielhumanmod File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,10 +42,13 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* {@link TaskHandler} responsible for deleting all of the files in a manifest and the manifest | ||
* itself. Since data files may be present in multiple manifests across different snapshots, we | ||
* assume a data file that doesn't exist is missing because it was already deleted by another task. | ||
* {@link TaskHandler} responsible for deleting table files: 1. Manifest files: It contains all the | ||
* files in a manifest and the manifest itself. Since data files may be present in multiple | ||
* manifests across different snapshots, we assume a data file that doesn't exist is missing because | ||
* it was already deleted by another task. 2. Table metadata files: It contains previous metadata | ||
* and statistics files, which are grouped and deleted in batch | ||
*/ | ||
// TODO: Rename this class since we introducing metadata cleanup here | ||
public class ManifestFileCleanupTaskHandler implements TaskHandler { | ||
public static final int MAX_ATTEMPTS = 3; | ||
public static final int FILE_DELETION_RETRY_MILLIS = 100; | ||
|
@@ -62,66 +65,119 @@ public ManifestFileCleanupTaskHandler( | |
|
||
@Override | ||
public boolean canHandleTask(TaskEntity task) { | ||
return task.getTaskType() == AsyncTaskType.FILE_CLEANUP; | ||
return task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP | ||
|| task.getTaskType() == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP; | ||
} | ||
|
||
@Override | ||
public boolean handleTask(TaskEntity task) { | ||
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class); | ||
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); | ||
TableIdentifier tableId = cleanupTask.getTableId(); | ||
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) { | ||
|
||
// if the file doesn't exist, we assume that another task execution was successful, but failed | ||
// to drop the task entity. Log a warning and return success | ||
if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) { | ||
if (task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP) { | ||
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); | ||
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); | ||
} else if (task.getTaskType() == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP) { | ||
return cleanUpMetadataFiles(cleanupTask.getMetadataFiles(), authorizedFileIO, tableId); | ||
} else { | ||
LOGGER | ||
.atWarn() | ||
.addKeyValue("manifestFile", manifestFile.path()) | ||
.addKeyValue("tableId", tableId) | ||
.log("Manifest cleanup task scheduled, but manifest file doesn't exist"); | ||
return true; | ||
} | ||
|
||
ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, authorizedFileIO); | ||
List<CompletableFuture<Void>> dataFileDeletes = | ||
StreamSupport.stream( | ||
Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE), | ||
false) | ||
.map( | ||
file -> | ||
tryDelete( | ||
tableId, authorizedFileIO, manifestFile, file.path().toString(), null, 1)) | ||
.toList(); | ||
LOGGER.debug( | ||
"Scheduled {} data files to be deleted from manifest {}", | ||
dataFileDeletes.size(), | ||
manifestFile.path()); | ||
try { | ||
// wait for all data files to be deleted, then wait for the manifest itself to be deleted | ||
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new)) | ||
.thenCompose( | ||
(v) -> { | ||
LOGGER | ||
.atInfo() | ||
.addKeyValue("manifestFile", manifestFile.path()) | ||
.log("All data files in manifest deleted - deleting manifest"); | ||
return tryDelete( | ||
tableId, authorizedFileIO, manifestFile, manifestFile.path(), null, 1); | ||
}) | ||
.get(); | ||
return true; | ||
} catch (InterruptedException e) { | ||
LOGGER.error( | ||
"Interrupted exception deleting data files from manifest {}", manifestFile.path(), e); | ||
throw new RuntimeException(e); | ||
} catch (ExecutionException e) { | ||
LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e); | ||
.log("Unknown task type {}", task.getTaskType()); | ||
return false; | ||
} | ||
} | ||
} | ||
|
||
private boolean cleanUpManifestFile( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the lots of changes here, but don’t worry—it’s mainly because I refactored the deletion logic for the manifest and all its data into a new method; no other changes were made in lines 91-135. |
||
ManifestFile manifestFile, FileIO fileIO, TableIdentifier tableId) { | ||
// if the file doesn't exist, we assume that another task execution was successful, but | ||
// failed to drop the task entity. Log a warning and return success | ||
if (!TaskUtils.exists(manifestFile.path(), fileIO)) { | ||
LOGGER | ||
.atWarn() | ||
.addKeyValue("manifestFile", manifestFile.path()) | ||
.addKeyValue("tableId", tableId) | ||
.log("Manifest cleanup task scheduled, but manifest file doesn't exist"); | ||
return true; | ||
} | ||
|
||
ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, fileIO); | ||
List<CompletableFuture<Void>> dataFileDeletes = | ||
StreamSupport.stream( | ||
Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE), | ||
false) | ||
.map(file -> tryDelete(tableId, fileIO, manifestFile, file.path().toString(), null, 1)) | ||
.toList(); | ||
LOGGER.debug( | ||
"Scheduled {} data files to be deleted from manifest {}", | ||
dataFileDeletes.size(), | ||
manifestFile.path()); | ||
try { | ||
// wait for all data files to be deleted, then wait for the manifest itself to be deleted | ||
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new)) | ||
.thenCompose( | ||
(v) -> { | ||
LOGGER | ||
.atInfo() | ||
.addKeyValue("manifestFile", manifestFile.path()) | ||
.log("All data files in manifest deleted - deleting manifest"); | ||
return tryDelete(tableId, fileIO, manifestFile, manifestFile.path(), null, 1); | ||
}) | ||
.get(); | ||
return true; | ||
} catch (InterruptedException e) { | ||
LOGGER.error( | ||
"Interrupted exception deleting data files from manifest {}", manifestFile.path(), e); | ||
throw new RuntimeException(e); | ||
} catch (ExecutionException e) { | ||
LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e); | ||
return false; | ||
} | ||
} | ||
|
||
private boolean cleanUpMetadataFiles( | ||
List<String> metadataFiles, FileIO fileIO, TableIdentifier tableId) { | ||
List<String> validFiles = | ||
metadataFiles.stream().filter(file -> TaskUtils.exists(file, fileIO)).toList(); | ||
if (validFiles.isEmpty()) { | ||
LOGGER | ||
.atWarn() | ||
.addKeyValue("metadataFiles", metadataFiles.toString()) | ||
.addKeyValue("tableId", tableId) | ||
.log("Table metadata cleanup task scheduled, but the none of the file in batch exists"); | ||
return true; | ||
} | ||
if (validFiles.size() < metadataFiles.size()) { | ||
List<String> missingFiles = | ||
metadataFiles.stream().filter(file -> !TaskUtils.exists(file, fileIO)).toList(); | ||
LOGGER | ||
.atWarn() | ||
.addKeyValue("metadataFiles", metadataFiles.toString()) | ||
.addKeyValue("missingFiles", missingFiles) | ||
.addKeyValue("tableId", tableId) | ||
.log( | ||
"Table metadata cleanup task scheduled, but {} files in the batch are missing", | ||
missingFiles.size()); | ||
} | ||
|
||
// Schedule the deletion for each file asynchronously | ||
List<CompletableFuture<Void>> deleteFutures = | ||
validFiles.stream().map(file -> tryDelete(tableId, fileIO, null, file, null, 1)).toList(); | ||
|
||
try { | ||
// Wait for all delete operations to finish | ||
CompletableFuture<Void> allDeletes = | ||
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); | ||
allDeletes.join(); | ||
} catch (Exception e) { | ||
LOGGER.error("Exception detected during metadata file deletion", e); | ||
return false; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
private static ManifestFile decodeManifestData(String manifestFileData) { | ||
try { | ||
return ManifestFiles.decode(Base64.decodeBase64(manifestFileData)); | ||
|
@@ -134,16 +190,16 @@ private CompletableFuture<Void> tryDelete( | |
TableIdentifier tableId, | ||
FileIO fileIO, | ||
ManifestFile manifestFile, | ||
String dataFile, | ||
String file, | ||
Throwable e, | ||
int attempt) { | ||
if (e != null && attempt <= MAX_ATTEMPTS) { | ||
LOGGER | ||
.atWarn() | ||
.addKeyValue("dataFile", dataFile) | ||
.addKeyValue("file", file) | ||
.addKeyValue("attempt", attempt) | ||
.addKeyValue("error", e.getMessage()) | ||
.log("Error encountered attempting to delete data file"); | ||
.log("Error encountered attempting to delete file"); | ||
} | ||
if (attempt > MAX_ATTEMPTS && e != null) { | ||
return CompletableFuture.failedFuture(e); | ||
|
@@ -155,27 +211,27 @@ private CompletableFuture<Void> tryDelete( | |
// file's existence, but then it is deleted before we have a chance to | ||
// send the delete request. In such a case, we <i>should</i> retry | ||
// and find | ||
if (TaskUtils.exists(dataFile, fileIO)) { | ||
fileIO.deleteFile(dataFile); | ||
if (TaskUtils.exists(file, fileIO)) { | ||
fileIO.deleteFile(file); | ||
} else { | ||
LOGGER | ||
.atInfo() | ||
.addKeyValue("dataFile", dataFile) | ||
.addKeyValue("manifestFile", manifestFile.path()) | ||
.addKeyValue("file", file) | ||
.addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "") | ||
.addKeyValue("tableId", tableId) | ||
.log("Manifest cleanup task scheduled, but data file doesn't exist"); | ||
.log("table file cleanup task scheduled, but data file doesn't exist"); | ||
} | ||
}, | ||
executorService) | ||
.exceptionallyComposeAsync( | ||
newEx -> { | ||
LOGGER | ||
.atWarn() | ||
.addKeyValue("dataFile", dataFile) | ||
.addKeyValue("tableIdentifer", tableId) | ||
.addKeyValue("manifestFile", manifestFile.path()) | ||
.addKeyValue("dataFile", file) | ||
.addKeyValue("tableIdentifier", tableId) | ||
.addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "") | ||
.log("Exception caught deleting data file from manifest", newEx); | ||
return tryDelete(tableId, fileIO, manifestFile, dataFile, newEx, attempt + 1); | ||
return tryDelete(tableId, fileIO, manifestFile, file, newEx, attempt + 1); | ||
}, | ||
CompletableFuture.delayedExecutor( | ||
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService)); | ||
|
@@ -185,12 +241,18 @@ private CompletableFuture<Void> tryDelete( | |
public static final class ManifestCleanupTask { | ||
private TableIdentifier tableId; | ||
private String manifestFileData; | ||
private List<String> metadataFiles; | ||
|
||
public ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) { | ||
this.tableId = tableId; | ||
this.manifestFileData = manifestFileData; | ||
} | ||
|
||
public ManifestCleanupTask(TableIdentifier tableId, List<String> metadataFiles) { | ||
this.tableId = tableId; | ||
this.metadataFiles = metadataFiles; | ||
} | ||
|
||
public ManifestCleanupTask() {} | ||
|
||
public TableIdentifier getTableId() { | ||
|
@@ -209,17 +271,26 @@ public void setManifestFileData(String manifestFileData) { | |
this.manifestFileData = manifestFileData; | ||
} | ||
|
||
public List<String> getMetadataFiles() { | ||
return metadataFiles; | ||
} | ||
|
||
public void setMetadataFiles(List<String> metadataFiles) { | ||
this.metadataFiles = metadataFiles; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object object) { | ||
if (this == object) return true; | ||
if (!(object instanceof ManifestCleanupTask that)) return false; | ||
return Objects.equals(tableId, that.tableId) | ||
&& Objects.equals(manifestFileData, that.manifestFileData); | ||
&& Objects.equals(manifestFileData, that.manifestFileData) | ||
&& Objects.equals(metadataFiles, that.metadataFiles); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(tableId, manifestFileData); | ||
return Objects.hash(tableId, manifestFileData, metadataFiles); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming this task will triger lots of relevent changes. If a rename is needed, we may want to handle it in a separate PR to avoid too much changes (Leave a TODO here)