Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support changelog scan for table with delete files #10935

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
218 changes: 143 additions & 75 deletions core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package org.apache.iceberg;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
import org.apache.iceberg.ManifestGroup.TaskContext;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
Expand Down Expand Up @@ -63,33 +61,39 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
return CloseableIterable.empty();
}

Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
Map<Long, Integer> snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots);

Set<ManifestFile> newDataManifests =
Iterable<CloseableIterable<ChangelogScanTask>> plans =
FluentIterable.from(changelogSnapshots)
.transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
.toSet();

ManifestGroup manifestGroup =
new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
.specsById(table().specs())
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
.ignoreExisting()
.columnsToKeepStats(columnsToKeepStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}

if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
manifestGroup = manifestGroup.planWith(planExecutor());
}

return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
.transform(
snapshot -> {
List<ManifestFile> dataManifests = snapshot.dataManifests(table().io());
List<ManifestFile> deleteManifests = snapshot.deleteManifests(table().io());

ManifestGroup manifestGroup =
new ManifestGroup(table().io(), dataManifests, deleteManifests)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause a substantial performance hit as we will scan all data and delete manifests that match the filter for each changelog snapshot, instead of opening only newly added manifests before. We may have to do that for deletes anyway, but I wonder about data manifests. Let me think a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went snapshot by snapshot as it was the easiest to reason with. For sure we need to consider more than just newly added manifests. For each snapshot, we need to consider existing data files if there are new deletes applicable to them.

.specsById(table().specs())
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
.columnsToKeepStats(columnsToKeepStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}

if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
pvary marked this conversation as resolved.
Show resolved Hide resolved
manifestGroup = manifestGroup.planWith(planExecutor());
}

long snapshotId = snapshot.snapshotId();
long sequenceNumber = snapshot.sequenceNumber();
int changeOrdinal = snapshotOrdinals.get(snapshotId);
return manifestGroup.plan(
new CreateDataFileChangeTasks(snapshotId, sequenceNumber, changeOrdinal));
});

return CloseableIterable.concat(plans);
}

@Override
Expand All @@ -105,22 +109,13 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl

for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
if (!snapshot.operation().equals(DataOperations.REPLACE)) {
if (!snapshot.deleteManifests(table().io()).isEmpty()) {
throw new UnsupportedOperationException(
"Delete files are currently not supported in changelog scans");
}

changelogSnapshots.addFirst(snapshot);
}
}

return changelogSnapshots;
}

private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
}

private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) {
Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();

Expand All @@ -133,51 +128,124 @@ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapsh
return snapshotOrdinals;
}

private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
private static class DummyChangelogScanTask implements ChangelogScanTask {
public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask();

private final Map<Long, Integer> snapshotOrdinals;
private DummyChangelogScanTask() {}

CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
@Override
public ChangelogOperation operation() {
return ChangelogOperation.DELETE;
}

@Override
public int changeOrdinal() {
return 0;
}

@Override
public long commitSnapshotId() {
return 0L;
}
}

private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
private final long snapshotId;
private final long sequenceNumber;
private final int changeOrdinal;

CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int changeOrdinal) {
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
this.changeOrdinal = changeOrdinal;
}

@Override
public CloseableIterable<ChangelogScanTask> apply(
CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {

return CloseableIterable.transform(
entries,
entry -> {
long commitSnapshotId = entry.snapshotId();
int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
DataFile dataFile = entry.file().copy(context.shouldKeepStats());

switch (entry.status()) {
case ADDED:
return new BaseAddedRowsScanTask(
changeOrdinal,
commitSnapshotId,
dataFile,
NO_DELETES,
context.schemaAsString(),
context.specAsString(),
context.residuals());

case DELETED:
return new BaseDeletedDataFileScanTask(
changeOrdinal,
commitSnapshotId,
dataFile,
NO_DELETES,
context.schemaAsString(),
context.specAsString(),
context.residuals());

default:
throw new IllegalArgumentException("Unexpected entry status: " + entry.status());
}
});
CloseableIterable<ChangelogScanTask> tasks =
CloseableIterable.transform(
entries,
entry -> {
long entrySnapshotId = entry.snapshotId();
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
List<DeleteFile> added = Lists.newArrayList();
List<DeleteFile> existing = Lists.newArrayList();
for (DeleteFile deleteFile : deleteFiles) {
if (sequenceNumber == deleteFile.dataSequenceNumber()) {
added.add(deleteFile);
} else {
existing.add(deleteFile);
}
}
DeleteFile[] addedDeleteFiles = added.toArray(new DeleteFile[0]);
DeleteFile[] existingDeleteFiles = existing.toArray(new DeleteFile[0]);
Comment on lines +174 to +184
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, I used a map from delete files to the snapshot where it is added, to determine the addedDeleteFiles and existingDeleteFiles. However, this map is computed for the snapshots in the range of interest (for the BaseIncrementalChangelogScan), and so there may not be an entry in the map for a delete file if it is added in a snapshot before this range. This causes issues in the FluentIterable::filter(Predicate) I was using to filter the delete files. This approach is simpler, and turns out to be what @manuzhang used his #9888. Hat tip to Manu.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary this was the bug I mentioned that I needed to fix before pushing a change with additional tests in TestBaseIncrementalChangelogScan.
In TestBaseIncrementalChangelogScan, in testDeletingDataFileWithExistingDeletes and testDeletingRowsInDataFileWithExistingDeletes, if the IncrementalChangelogScan was from snap1 to snap3, the bug was obscured; but with a scan from snap2 to snap3, those tests revealed the bug.


switch (entry.status()) {
case ADDED:
if (entrySnapshotId == snapshotId) {
return new BaseAddedRowsScanTask(
changeOrdinal,
snapshotId,
dataFile,
addedDeleteFiles,
context.schemaAsString(),
context.specAsString(),
context.residuals());
} else {
// the data file is added before the snapshot we're processing
if (addedDeleteFiles.length == 0) {
return DummyChangelogScanTask.INSTANCE;
} else {
return new BaseDeletedRowsScanTask(
changeOrdinal,
snapshotId,
dataFile,
addedDeleteFiles,
existingDeleteFiles,
context.schemaAsString(),
context.specAsString(),
context.residuals());
}
}

case DELETED:
if (entrySnapshotId == snapshotId) {
return new BaseDeletedDataFileScanTask(
changeOrdinal,
snapshotId,
dataFile,
existingDeleteFiles,
context.schemaAsString(),
context.specAsString(),
context.residuals());
} else {
return DummyChangelogScanTask.INSTANCE;
}

case EXISTING:
if (addedDeleteFiles.length == 0) {
return DummyChangelogScanTask.INSTANCE;
} else {
return new BaseDeletedRowsScanTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this cause a full table scan for every snapshot in the snapshot range we are checking when there is an equality delete file present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that a BaseDeletedRowsScanTask is only created if there are delete files added in that snapshot that may apply to the data file.
Supposing that there is an equality delete file added in that snapshot, whether a full table scan happens for that snapshot depends on the equality deletes and how well DeleteFileIndex.canContainEqDeletesForFile(DataFile, EqualityDeleteFile) is able to filter them. If the equality deletes are for a partition, or even if global but can be excluded using ranges, then a full table scan may not happen. But yes, depending on the equality deletes it could happen.
Unfortunately, I don't see an alternative.

changeOrdinal,
snapshotId,
dataFile,
addedDeleteFiles,
existingDeleteFiles,
context.schemaAsString(),
context.specAsString(),
context.residuals());
}

default:
throw new IllegalArgumentException(
"Unexpected entry status: " + entry.status());
}
});
return CloseableIterable.filter(tasks, task -> (task != DummyChangelogScanTask.INSTANCE));
}
}
}
Loading