-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support changelog scan for table with delete files #10935
base: main
Are you sure you want to change the base?
Changes from all commits
0f672e5
49619eb
7f3ea71
6a9d413
760adb2
4300449
4312368
14b8f48
98902eb
ab7044e
2000982
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,16 +19,14 @@ | |
package org.apache.iceberg; | ||
|
||
import java.util.ArrayDeque; | ||
import java.util.Collection; | ||
import java.util.Deque; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.apache.iceberg.ManifestGroup.CreateTasksFunction; | ||
import org.apache.iceberg.ManifestGroup.TaskContext; | ||
import org.apache.iceberg.io.CloseableIterable; | ||
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
import org.apache.iceberg.util.SnapshotUtil; | ||
import org.apache.iceberg.util.TableScanUtil; | ||
|
@@ -63,33 +61,39 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles( | |
return CloseableIterable.empty(); | ||
} | ||
|
||
Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots); | ||
Map<Long, Integer> snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); | ||
|
||
Set<ManifestFile> newDataManifests = | ||
Iterable<CloseableIterable<ChangelogScanTask>> plans = | ||
FluentIterable.from(changelogSnapshots) | ||
.transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) | ||
.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) | ||
.toSet(); | ||
|
||
ManifestGroup manifestGroup = | ||
new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) | ||
.specsById(table().specs()) | ||
.caseSensitive(isCaseSensitive()) | ||
.select(scanColumns()) | ||
.filterData(filter()) | ||
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) | ||
.ignoreExisting() | ||
.columnsToKeepStats(columnsToKeepStats()); | ||
|
||
if (shouldIgnoreResiduals()) { | ||
manifestGroup = manifestGroup.ignoreResiduals(); | ||
} | ||
|
||
if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { | ||
manifestGroup = manifestGroup.planWith(planExecutor()); | ||
} | ||
|
||
return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots)); | ||
.transform( | ||
snapshot -> { | ||
List<ManifestFile> dataManifests = snapshot.dataManifests(table().io()); | ||
List<ManifestFile> deleteManifests = snapshot.deleteManifests(table().io()); | ||
|
||
ManifestGroup manifestGroup = | ||
new ManifestGroup(table().io(), dataManifests, deleteManifests) | ||
.specsById(table().specs()) | ||
.caseSensitive(isCaseSensitive()) | ||
.select(scanColumns()) | ||
.filterData(filter()) | ||
.columnsToKeepStats(columnsToKeepStats()); | ||
|
||
if (shouldIgnoreResiduals()) { | ||
manifestGroup = manifestGroup.ignoreResiduals(); | ||
} | ||
|
||
if (dataManifests.size() > 1 && shouldPlanWithExecutor()) { | ||
pvary marked this conversation as resolved.
Show resolved
Hide resolved
|
||
manifestGroup = manifestGroup.planWith(planExecutor()); | ||
} | ||
|
||
long snapshotId = snapshot.snapshotId(); | ||
long sequenceNumber = snapshot.sequenceNumber(); | ||
int changeOrdinal = snapshotOrdinals.get(snapshotId); | ||
return manifestGroup.plan( | ||
new CreateDataFileChangeTasks(snapshotId, sequenceNumber, changeOrdinal)); | ||
}); | ||
|
||
return CloseableIterable.concat(plans); | ||
} | ||
|
||
@Override | ||
|
@@ -105,22 +109,13 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl | |
|
||
for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) { | ||
if (!snapshot.operation().equals(DataOperations.REPLACE)) { | ||
if (!snapshot.deleteManifests(table().io()).isEmpty()) { | ||
throw new UnsupportedOperationException( | ||
"Delete files are currently not supported in changelog scans"); | ||
} | ||
|
||
changelogSnapshots.addFirst(snapshot); | ||
} | ||
} | ||
|
||
return changelogSnapshots; | ||
} | ||
|
||
private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) { | ||
return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); | ||
} | ||
|
||
private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) { | ||
Map<Long, Integer> snapshotOrdinals = Maps.newHashMap(); | ||
|
||
|
@@ -133,51 +128,124 @@ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapsh | |
return snapshotOrdinals; | ||
} | ||
|
||
private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> { | ||
private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; | ||
private static class DummyChangelogScanTask implements ChangelogScanTask { | ||
public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); | ||
|
||
private final Map<Long, Integer> snapshotOrdinals; | ||
private DummyChangelogScanTask() {} | ||
|
||
CreateDataFileChangeTasks(Deque<Snapshot> snapshots) { | ||
this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); | ||
@Override | ||
public ChangelogOperation operation() { | ||
return ChangelogOperation.DELETE; | ||
} | ||
|
||
@Override | ||
public int changeOrdinal() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public long commitSnapshotId() { | ||
return 0L; | ||
} | ||
} | ||
|
||
private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> { | ||
private final long snapshotId; | ||
private final long sequenceNumber; | ||
private final int changeOrdinal; | ||
|
||
CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int changeOrdinal) { | ||
this.snapshotId = snapshotId; | ||
this.sequenceNumber = sequenceNumber; | ||
this.changeOrdinal = changeOrdinal; | ||
} | ||
|
||
@Override | ||
public CloseableIterable<ChangelogScanTask> apply( | ||
CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) { | ||
|
||
return CloseableIterable.transform( | ||
entries, | ||
entry -> { | ||
long commitSnapshotId = entry.snapshotId(); | ||
int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); | ||
DataFile dataFile = entry.file().copy(context.shouldKeepStats()); | ||
|
||
switch (entry.status()) { | ||
case ADDED: | ||
return new BaseAddedRowsScanTask( | ||
changeOrdinal, | ||
commitSnapshotId, | ||
dataFile, | ||
NO_DELETES, | ||
context.schemaAsString(), | ||
context.specAsString(), | ||
context.residuals()); | ||
|
||
case DELETED: | ||
return new BaseDeletedDataFileScanTask( | ||
changeOrdinal, | ||
commitSnapshotId, | ||
dataFile, | ||
NO_DELETES, | ||
context.schemaAsString(), | ||
context.specAsString(), | ||
context.residuals()); | ||
|
||
default: | ||
throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); | ||
} | ||
}); | ||
CloseableIterable<ChangelogScanTask> tasks = | ||
CloseableIterable.transform( | ||
entries, | ||
entry -> { | ||
long entrySnapshotId = entry.snapshotId(); | ||
DataFile dataFile = entry.file().copy(context.shouldKeepStats()); | ||
DeleteFile[] deleteFiles = context.deletes().forEntry(entry); | ||
List<DeleteFile> added = Lists.newArrayList(); | ||
List<DeleteFile> existing = Lists.newArrayList(); | ||
for (DeleteFile deleteFile : deleteFiles) { | ||
if (sequenceNumber == deleteFile.dataSequenceNumber()) { | ||
added.add(deleteFile); | ||
} else { | ||
existing.add(deleteFile); | ||
} | ||
} | ||
DeleteFile[] addedDeleteFiles = added.toArray(new DeleteFile[0]); | ||
DeleteFile[] existingDeleteFiles = existing.toArray(new DeleteFile[0]); | ||
Comment on lines
+174
to
+184
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, I used a map from delete files to the snapshot where it is added, to determine the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pvary this was the bug I mentioned that I needed to fix before pushing a change with additional tests in |
||
|
||
switch (entry.status()) { | ||
case ADDED: | ||
if (entrySnapshotId == snapshotId) { | ||
return new BaseAddedRowsScanTask( | ||
changeOrdinal, | ||
snapshotId, | ||
dataFile, | ||
addedDeleteFiles, | ||
context.schemaAsString(), | ||
context.specAsString(), | ||
context.residuals()); | ||
} else { | ||
// the data file is added before the snapshot we're processing | ||
if (addedDeleteFiles.length == 0) { | ||
return DummyChangelogScanTask.INSTANCE; | ||
} else { | ||
return new BaseDeletedRowsScanTask( | ||
changeOrdinal, | ||
snapshotId, | ||
dataFile, | ||
addedDeleteFiles, | ||
existingDeleteFiles, | ||
context.schemaAsString(), | ||
context.specAsString(), | ||
context.residuals()); | ||
} | ||
} | ||
|
||
case DELETED: | ||
if (entrySnapshotId == snapshotId) { | ||
return new BaseDeletedDataFileScanTask( | ||
changeOrdinal, | ||
snapshotId, | ||
dataFile, | ||
existingDeleteFiles, | ||
context.schemaAsString(), | ||
context.specAsString(), | ||
context.residuals()); | ||
} else { | ||
return DummyChangelogScanTask.INSTANCE; | ||
} | ||
|
||
case EXISTING: | ||
if (addedDeleteFiles.length == 0) { | ||
return DummyChangelogScanTask.INSTANCE; | ||
} else { | ||
return new BaseDeletedRowsScanTask( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this cause a full table scan for every snapshot in the snapshot range we are checking when there is an equality delete file present? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that a BaseDeletedRowsScanTask is only created if there are delete files added in that snapshot that may apply to the data file. |
||
changeOrdinal, | ||
snapshotId, | ||
dataFile, | ||
addedDeleteFiles, | ||
existingDeleteFiles, | ||
context.schemaAsString(), | ||
context.specAsString(), | ||
context.residuals()); | ||
} | ||
|
||
default: | ||
throw new IllegalArgumentException( | ||
"Unexpected entry status: " + entry.status()); | ||
} | ||
}); | ||
return CloseableIterable.filter(tasks, task -> (task != DummyChangelogScanTask.INSTANCE)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause a substantial performance hit as we will scan all data and delete manifests that match the filter for each changelog snapshot, instead of opening only newly added manifests before. We may have to do that for deletes anyway, but I wonder about data manifests. Let me think a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went snapshot by snapshot as it was the easiest to reason with. For sure we need to consider more than just newly added manifests. For each snapshot, we need to consider existing data files if there are new deletes applicable to them.