Skip to content

Commit

Permalink
Testing the BaseIncrementalChangeloggScan implementation from apache#…
Browse files Browse the repository at this point in the history
…9888.

This uses the BaseIncrementalChangeloggScan implementation from apache#9888
and the rest of the changes (including tests) from apache#10935 to test
the BaseIncrementalChangeloggScan implementation.
  • Loading branch information
wypoon committed Aug 16, 2024
1 parent 33259f9 commit d9307d3
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
import org.apache.iceberg.ManifestGroup.TaskContext;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;

Expand Down Expand Up @@ -63,21 +65,27 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
return CloseableIterable.empty();
}

Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);

Set<ManifestFile> newDataManifests =
FluentIterable.from(changelogSnapshots)
.transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
.toSet();
Set<ManifestFile> newDataManifests = Sets.newHashSet();
Set<ManifestFile> newDeleteManifests = Sets.newHashSet();
Map<Long, Snapshot> addedToChangedSnapshots = Maps.newHashMap();
for (Snapshot snapshot : changelogSnapshots) {
List<ManifestFile> dataManifests = snapshot.dataManifests(table().io());
for (ManifestFile manifest : dataManifests) {
if (!newDataManifests.contains(manifest)) {
addedToChangedSnapshots.put(manifest.snapshotId(), snapshot);
newDataManifests.add(manifest);
}
}
newDeleteManifests.addAll(snapshot.deleteManifests(table().io()));
}

ManifestGroup manifestGroup =
new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
new ManifestGroup(table().io(), newDataManifests, newDeleteManifests)
.specsById(table().specs())
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
.filterManifestEntries(entry -> addedToChangedSnapshots.containsKey(entry.snapshotId()))
.ignoreExisting()
.columnsToKeepStats(columnsToKeepStats());

Expand All @@ -89,7 +97,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
manifestGroup = manifestGroup.planWith(planExecutor());
}

return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
return manifestGroup.plan(
new CreateDataFileChangeTasks(changelogSnapshots, addedToChangedSnapshots));
}

@Override
Expand All @@ -105,11 +114,6 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl

for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
if (!snapshot.operation().equals(DataOperations.REPLACE)) {
if (!snapshot.deleteManifests(table().io()).isEmpty()) {
throw new UnsupportedOperationException(
"Delete files are currently not supported in changelog scans");
}

changelogSnapshots.addFirst(snapshot);
}
}
Expand All @@ -134,50 +138,81 @@ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapsh
}

private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
private static final DeleteFile[] NO_DELETES = new DeleteFile[0];

private final Map<Long, Integer> snapshotOrdinals;
private final Map<Long, Snapshot> addedToChangedSnapshots;

CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
CreateDataFileChangeTasks(
Deque<Snapshot> snapshots, Map<Long, Snapshot> addedToChangedSnapshots) {
this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
this.addedToChangedSnapshots = addedToChangedSnapshots;
}

@Override
public CloseableIterable<ChangelogScanTask> apply(
CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {

return CloseableIterable.transform(
entries,
entry -> {
long commitSnapshotId = entry.snapshotId();
int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
DataFile dataFile = entry.file().copy(context.shouldKeepStats());

switch (entry.status()) {
case ADDED:
return new BaseAddedRowsScanTask(
changeOrdinal,
commitSnapshotId,
dataFile,
NO_DELETES,
context.schemaAsString(),
context.specAsString(),
context.residuals());

case DELETED:
return new BaseDeletedDataFileScanTask(
changeOrdinal,
commitSnapshotId,
dataFile,
NO_DELETES,
context.schemaAsString(),
context.specAsString(),
context.residuals());

default:
throw new IllegalArgumentException("Unexpected entry status: " + entry.status());
}
});
return CloseableIterable.filter(
CloseableIterable.transform(
entries,
entry -> {
long snapshotId = entry.snapshotId();
Snapshot snapshot = addedToChangedSnapshots.get(snapshotId);
long commitSnapshotId = snapshot.snapshotId();
int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId());
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
DeleteFile[] deleteFiles = context.deletes().forDataFile(dataFile);
List<DeleteFile> addedDeletes = Lists.newArrayList();
List<DeleteFile> existingDeletes = Lists.newArrayList();
for (DeleteFile file : deleteFiles) {
if (file.dataSequenceNumber() == snapshot.sequenceNumber()) {
addedDeletes.add(file);
} else {
existingDeletes.add(file);
}
}

switch (entry.status()) {
case ADDED:
if (snapshotId == commitSnapshotId) {
return new BaseAddedRowsScanTask(
changeOrdinal,
commitSnapshotId,
dataFile,
addedDeletes.toArray(new DeleteFile[0]),
context.schemaAsString(),
context.specAsString(),
context.residuals());
} else if (deleteFiles.length > 0) {
return new BaseDeletedRowsScanTask(
changeOrdinal,
commitSnapshotId,
dataFile,
addedDeletes.toArray(new DeleteFile[0]),
existingDeletes.toArray(new DeleteFile[0]),
context.schemaAsString(),
context.specAsString(),
context.residuals());
} else {
return null;
}

case DELETED:
return new BaseDeletedDataFileScanTask(
changeOrdinal,
commitSnapshotId,
dataFile,
existingDeletes.toArray(new DeleteFile[0]),
context.schemaAsString(),
context.specAsString(),
context.residuals());

default:
throw new IllegalArgumentException(
"Unexpected entry status: " + entry.status());
}
}),
Objects::nonNull);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -247,19 +245,6 @@ public void testDataFileRewrites() {
assertThat(t2.deletes()).as("Must be no deletes").isEmpty();
}

@TestTemplate
public void testDeleteFilesAreNotSupported() {
assumeThat(formatVersion).isEqualTo(2);

table.newFastAppend().appendFile(FILE_A2).appendFile(FILE_B).commit();

table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();

assertThatThrownBy(() -> plan(newScan()))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Delete files are currently not supported in changelog scans");
}

// plans tasks and reorders them to have deterministic order
private List<ChangelogScanTask> plan(IncrementalChangelogScan scan) {
try (CloseableIterable<ChangelogScanTask> tasks = scan.planFiles()) {
Expand Down
22 changes: 11 additions & 11 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void incrementDeleteCount() {
counter.increment();
}

Accessor<StructLike> posAccessor() {
protected Accessor<StructLike> posAccessor() {
return posAccessor;
}

Expand Down Expand Up @@ -197,31 +197,31 @@ record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
}

public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
// Predicate to test whether a row has been deleted by equality deletions.
Predicate<T> deletedRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);

return CloseableIterable.filter(records, deletedRows);
return CloseableIterable.filter(records, isEqDeleted());
}

private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
Predicate<T> isEqDeleted = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);

return createDeleteIterable(records, isEqDeleted);
return createDeleteIterable(records, isEqDeleted());
}

protected void markRowDeleted(T item) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement markRowDeleted");
}

public Predicate<T> eqDeletedRowFilter() {
// Predicate to test whether a row has been deleted by equality deletes
public Predicate<T> isEqDeleted() {
if (eqDeleteRows == null) {
eqDeleteRows =
applyEqDeletes().stream().map(Predicate::negate).reduce(Predicate::and).orElse(t -> true);
eqDeleteRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);
}
return eqDeleteRows;
}

// Predicate to test whether a row has not been deleted by equality deletes
public Predicate<T> eqDeletedRowFilter() {
return isEqDeleted().negate();
}

public PositionDeleteIndex deletedRowPositions() {
if (deleteRowPositions == null && !posDeletes.isEmpty()) {
this.deleteRowPositions = deleteLoader().loadPositionDeletes(posDeletes, filePath);
Expand Down
Loading

0 comments on commit d9307d3

Please sign in to comment.