Skip to content

Commit

Permalink
[core] add deletedFileTotalSizeInBytes in result of OrphanFilesClean (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Nov 26, 2024
1 parent ca4af64 commit 8d57d3d
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation;

import org.apache.paimon.fs.Path;

import java.util.List;

/** The result of OrphanFilesClean. */
public class CleanOrphanFilesResult {

private List<Path> deletedFilesPath;
private final long deletedFileCount;
private final long deletedFileTotalLenInBytes;

public CleanOrphanFilesResult(long deletedFileCount, long deletedFileTotalLenInBytes) {
this.deletedFileCount = deletedFileCount;
this.deletedFileTotalLenInBytes = deletedFileTotalLenInBytes;
}

public CleanOrphanFilesResult(
List<Path> deletedFilesPath, long deletedFileCount, long deletedFileTotalLenInBytes) {
this(deletedFileCount, deletedFileTotalLenInBytes);
this.deletedFilesPath = deletedFilesPath;
}

public long getDeletedFileCount() {
return deletedFileCount;
}

public long getDeletedFileTotalLenInBytes() {
return deletedFileTotalLenInBytes;
}

public List<Path> getDeletedFilesPath() {
return deletedFilesPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableConsumer;

import javax.annotation.Nullable;
Expand All @@ -47,6 +47,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -68,6 +69,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {

private final List<Path> deleteFiles;

private final AtomicLong deletedFilesLenInBytes = new AtomicLong(0);

private Set<String> candidateDeletes;

public LocalOrphanFilesClean(FileStoreTable table) {
Expand All @@ -87,16 +90,18 @@ public LocalOrphanFilesClean(
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
}

public List<Path> clean() throws IOException, ExecutionException, InterruptedException {
public CleanOrphanFilesResult clean()
throws IOException, ExecutionException, InterruptedException {
List<String> branches = validBranches();

// specially handle to clear snapshot dir
cleanSnapshotDir(branches, deleteFiles::add);
cleanSnapshotDir(branches, deleteFiles::add, deletedFilesLenInBytes::addAndGet);

// delete candidate files
Map<String, Path> candidates = getCandidateDeletingFiles();
Map<String, Pair<Path, Long>> candidates = getCandidateDeletingFiles();
if (candidates.isEmpty()) {
return deleteFiles;
return new CleanOrphanFilesResult(
deleteFiles, deleteFiles.size(), deletedFilesLenInBytes.get());
}
candidateDeletes = new HashSet<>(candidates.keySet());

Expand All @@ -108,12 +113,22 @@ public List<Path> clean() throws IOException, ExecutionException, InterruptedExc

// delete unused files
candidateDeletes.removeAll(usedFiles);
candidateDeletes.stream().map(candidates::get).forEach(fileCleaner);
candidateDeletes.stream()
.map(candidates::get)
.forEach(
deleteFileInfo -> {
deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight());
fileCleaner.accept(deleteFileInfo.getLeft());
});
deleteFiles.addAll(
candidateDeletes.stream().map(candidates::get).collect(Collectors.toList()));
candidateDeletes.stream()
.map(candidates::get)
.map(Pair::getLeft)
.collect(Collectors.toList()));
candidateDeletes.clear();

return deleteFiles;
return new CleanOrphanFilesResult(
deleteFiles, deleteFiles.size(), deletedFilesLenInBytes.get());
}

private void collectWithoutDataFile(
Expand Down Expand Up @@ -172,19 +187,20 @@ private Set<String> getUsedFiles(String branch) {
* Get all the candidate deleting files in the specified directories and filter them by
* olderThanMillis.
*/
private Map<String, Path> getCandidateDeletingFiles() {
private Map<String, Pair<Path, Long>> getCandidateDeletingFiles() {
List<Path> fileDirs = listPaimonFileDirs();
Function<Path, List<Path>> processor =
Function<Path, List<Pair<Path, Long>>> processor =
path ->
tryBestListingDirs(path).stream()
.filter(this::oldEnough)
.map(FileStatus::getPath)
.map(status -> Pair.of(status.getPath(), status.getLen()))
.collect(Collectors.toList());
Iterator<Path> allPaths = randomlyExecuteSequentialReturn(executor, processor, fileDirs);
Map<String, Path> result = new HashMap<>();
while (allPaths.hasNext()) {
Path next = allPaths.next();
result.put(next.getName(), next);
Iterator<Pair<Path, Long>> allFilesInfo =
randomlyExecuteSequentialReturn(executor, processor, fileDirs);
Map<String, Pair<Path, Long>> result = new HashMap<>();
while (allFilesInfo.hasNext()) {
Pair<Path, Long> fileInfo = allFilesInfo.next();
result.put(fileInfo.getLeft().getName(), fileInfo);
}
return result;
}
Expand All @@ -197,7 +213,6 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<LocalOrphanFilesClean> orphanFilesCleans = new ArrayList<>();
List<String> tableNames = Collections.singletonList(tableName);
if (tableName == null || "*".equals(tableName)) {
tableNames = catalog.listTables(databaseName);
Expand All @@ -214,6 +229,7 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
}
};

List<LocalOrphanFilesClean> orphanFilesCleans = new ArrayList<>(tableNames.size());
for (String t : tableNames) {
Identifier identifier = new Identifier(databaseName, t);
Table table = catalog.getTable(identifier).copy(dynamicOptions);
Expand All @@ -230,7 +246,7 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
return orphanFilesCleans;
}

public static long executeDatabaseOrphanFiles(
public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
Catalog catalog,
String databaseName,
@Nullable String tableName,
Expand All @@ -249,15 +265,17 @@ public static long executeDatabaseOrphanFiles(

ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<List<Path>>> tasks = new ArrayList<>();
List<Future<CleanOrphanFilesResult>> tasks = new ArrayList<>(tableCleans.size());
for (LocalOrphanFilesClean clean : tableCleans) {
tasks.add(executorService.submit(clean::clean));
}

List<Path> cleanOrphanFiles = new ArrayList<>();
for (Future<List<Path>> task : tasks) {
long deletedFileCount = 0;
long deletedFileTotalLenInBytes = 0;
for (Future<CleanOrphanFilesResult> task : tasks) {
try {
cleanOrphanFiles.addAll(task.get());
deletedFileCount += task.get().getDeletedFileCount();
deletedFileTotalLenInBytes += task.get().getDeletedFileTotalLenInBytes();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand All @@ -267,6 +285,6 @@ public static long executeDatabaseOrphanFiles(
}

executorService.shutdownNow();
return cleanOrphanFiles.size();
return new CleanOrphanFilesResult(deletedFileCount, deletedFileTotalLenInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,47 @@ protected List<String> validBranches() {
return branches;
}

protected void cleanSnapshotDir(List<String> branches, Consumer<Path> deletedFileConsumer) {
protected void cleanSnapshotDir(
List<String> branches,
Consumer<Path> deletedFilesConsumer,
Consumer<Long> deletedFilesLenInBytesConsumer) {
for (String branch : branches) {
FileStoreTable branchTable = table.switchToBranch(branch);
SnapshotManager snapshotManager = branchTable.snapshotManager();

// specially handle the snapshot directory
List<Path> nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
nonSnapshotFiles.forEach(fileCleaner);
nonSnapshotFiles.forEach(deletedFileConsumer);
List<Pair<Path, Long>> nonSnapshotFiles =
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
nonSnapshotFiles.forEach(
nonSnapshotFile ->
cleanFile(
nonSnapshotFile,
deletedFilesConsumer,
deletedFilesLenInBytesConsumer));

// specially handle the changelog directory
List<Path> nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
nonChangelogFiles.forEach(fileCleaner);
nonChangelogFiles.forEach(deletedFileConsumer);
List<Pair<Path, Long>> nonChangelogFiles =
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
nonChangelogFiles.forEach(
nonChangelogFile ->
cleanFile(
nonChangelogFile,
deletedFilesConsumer,
deletedFilesLenInBytesConsumer));
}
}

private void cleanFile(
Pair<Path, Long> deleteFileInfo,
Consumer<Path> deletedFilesConsumer,
Consumer<Long> deletedFilesLenInBytesConsumer) {
Path filePath = deleteFileInfo.getLeft();
Long fileSize = deleteFileInfo.getRight();
deletedFilesConsumer.accept(filePath);
deletedFilesLenInBytesConsumer.accept(fileSize);
fileCleaner.accept(filePath);
}

protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException {
FileStoreTable branchTable = table.switchToBranch(branch);
SnapshotManager snapshotManager = branchTable.snapshotManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,15 @@ private void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths)
* Try to get non snapshot files. If any error occurred, just ignore it and return an empty
* result.
*/
public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> fileStatusFilter) {
public List<Pair<Path, Long>> tryGetNonSnapshotFiles(Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter());
}

public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
public List<Pair<Path, Long>> tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(changelogDirectory(), fileStatusFilter, nonChangelogFileFilter());
}

private List<Path> listPathWithFilter(
private List<Pair<Path, Long>> listPathWithFilter(
Path directory, Predicate<FileStatus> fileStatusFilter, Predicate<Path> fileFilter) {
try {
FileStatus[] statuses = fileIO.listStatus(directory);
Expand All @@ -581,8 +581,8 @@ private List<Path> listPathWithFilter(

return Arrays.stream(statuses)
.filter(fileStatusFilter)
.map(FileStatus::getPath)
.filter(fileFilter)
.filter(status -> fileFilter.test(status.getPath()))
.map(status -> Pair.of(status.getPath(), status.getLen()))
.collect(Collectors.toList());
} catch (IOException ignored) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,20 @@ public void testNormallyRemoving() throws Throwable {

// randomly delete tags
List<String> deleteTags = Collections.emptyList();
if (!allTags.isEmpty()) {
deleteTags = randomlyPick(allTags);
for (String tagName : deleteTags) {
table.deleteTag(tagName);
}
deleteTags = randomlyPick(allTags);
for (String tagName : deleteTags) {
table.deleteTag(tagName);
}

// first check, nothing will be deleted because the default olderThan interval is 1 day
LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table);
assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);

// second check
orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2));
List<Path> deleted = orphanFilesClean.clean();
List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
try {
validate(deleted, snapshotData, new HashMap<>());
} catch (Throwable t) {
Expand Down Expand Up @@ -363,13 +361,13 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer)

// first check, nothing will be deleted because the default olderThan interval is 1 day
LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table);
assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);

// second check
orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2));
List<Path> deleted = orphanFilesClean.clean();
List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
validate(deleted, snapshotData, changelogData);
}

Expand Down Expand Up @@ -399,7 +397,7 @@ public void testAbnormallyRemoving() throws Exception {
LocalOrphanFilesClean orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2));
assertThat(orphanFilesClean.clean().size()).isGreaterThan(0);
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0);
}

private void writeData(
Expand Down
Loading

0 comments on commit 8d57d3d

Please sign in to comment.