-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support changelog scan for table with delete files #10935
base: main
Are you sure you want to change the base?
Conversation
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
Outdated
Show resolved
Hide resolved
if (addedDeleteFiles.length == 0) { | ||
return new DummyChangelogScanTask(changeOrdinal, snapshotId); | ||
} else { | ||
return new BaseDeletedRowsScanTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this cause a full table scan for every snapshot in the snapshot range we are checking when there is an equality delete file present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that a BaseDeletedRowsScanTask is only created if there are delete files added in that snapshot that may apply to the data file.
Supposing that there is an equality delete file added in that snapshot, whether a full table scan happens for that snapshot depends on the equality deletes and how well DeleteFileIndex.canContainEqDeletesForFile(DataFile, EqualityDeleteFile)
is able to filter them. If the equality deletes are for a partition, or even if global but can be excluded using ranges, then a full table scan may not happen. But yes, depending on the equality deletes it could happen.
Unfortunately, I don't see an alternative.
@manuzhang I was not aware of it. Let me take a look. Unfortunately a lot of PRs don't get reviewed. All I knew was that changelog scan for the case where delete files are present was not implemented in main up to now. |
1e22d51
to
f3963fb
Compare
…9888. This uses the BaseIncrementalChangeloggScan implementation from apache#9888 and the rest of the changes (including tests) from apache#10935 to test the BaseIncrementalChangeloggScan implementation.
@manuzhang thank you for pointing me to your PR, #9888. I have tried testing it and left comments on your PR. |
@aokolnychyi @flyrain @stevenzwu @szehon-ho can you please review this? |
@wypoon, @manuzhang: I'm interested in providing Flink streaming CDC reads. That would require a working changelog scan planning in Iceberg core. So I would be happy to help with my reviews here. I'm not an expert of this part of the code yet, so if the PR becomes more complicated, we might have to ask help from folks more experienced here, but I can do a first round of the reviews. I see that there are multiple PRs for this topic (#9888, #10954). I would suggest to focus the discussion on one specific PR. There are some tricky scenarios with Flink CDC, we need to handle: Scenario1:
When the changelog scan reads the 3rd snapshot, it should consider:
Notice that for DF1 we should not emit records which are deleted by previous deletes. So I am concerned about your Scenario2:
When changelog scan the 2nd snapshot, it should consider:
Notice that the order of the records is important, and not trivial to order if the files are read in the distributed way. CC: @dramaticlly |
@pvary thank you for your interest and for the Flink scenarios, which is very helpful as I am unfamiliar with Flink. Regarding #9888, please read my comments there. I put up #10954 only as a reference for @manuzhang so he can see the tests I added which fail with the implementation of Next week I'll look into the scenarios you listed and see what gaps there are in my implementation and add tests as necessary. Regarding |
I agree, that this should be the case. My only concern is, that until now, it didn't cause any issues. We might not thought about it and didn't clarify this in the specification - so if we have a hard requirement here, we might want to clarify it in the spec too.
Yes, undestand correctly. That's what I was trying to describe |
@pvary I have added |
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
Outdated
Show resolved
Hide resolved
// for snapshot 2, we should record only one delete for id=2 (from the equality delete) | ||
// and one insert (the final value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one way to present this data. And I would be totally fine with this result. For Flink this is as good as it can get.
Other writers might write delete records into the positional delete records (like df1.path(), 0L
- to remove "1,a") along with the ones which remove the the new records. If those are handled correctly, then we are golden here.
These are very great tests. Thanks for implementing them!
Do I understand correctly that this is not yet the situation with the current code?
I think we agree here. I'm perfectly fine if we can make sure that the added and immediately removed records are not emitted during the incremental scan read.
I think the output for snapshot3 in scenario1 is not correct. I have left a comment there. |
cc @dramaticlly, I think you might be interested. |
@pvary thanks for reviewing the tests! |
Sorry for the confusion @wypoon!
Anyways, added my comments to the thread as well |
@pvary thanks for posting on the dev list thread!
This corresponds to case (b) in the thread. But now I understand that you must have made a mistake and did not really mean the
and think that (a) is the correct behavior. |
// Predicate to test whether a row has not been deleted by equality deletes | ||
public Predicate<T> eqDeletedRowFilter() { | ||
return isEqDeleted().negate(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used in one place, ColumnarBatchReader.ColumnarBatchLoader::applyEqDelete(ColumnarBatch)
.
I propose deprecating this and changing the call to the negation of isEqDeleted()
instead. isEqDeleted()
is more useful. I'll do that in a separate PR.
return CloseableIterable.transform( | ||
deletes.filter(rows(task, deletes.requiredSchema())), | ||
row -> { | ||
InternalRow expectedRow = new GenericInternalRow(columns.length); | ||
|
||
for (int i = 0; i < columns.length; i++) { | ||
expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i])); | ||
} | ||
|
||
return expectedRow; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When DeleteFilter::filter
is called, if there are deletes to be applied, the requiredSchema
could have additional columns beyond the requested schema (e.g., _pos
is added if there are positional deletes to be applied). Thus the InternalRow
could have more columns than the requested schema, and we need to exclude the additional columns and only keep the ones for the requested schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I realized this as well when reading from RowDataReader, looks like if there's position delete present it will always add pos
metadata column as required in
iceberg/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Lines 260 to 263 in 684f7a7
if (!posDeletes.isEmpty()) { | |
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId()); | |
} | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speak of which, i saw this update to InternalRow
has been repeated here and below for deletedDataFileScanTask, do you know if we can put it into a method instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks the same, but it is not the same transform as it is a closure, and indexes
in each closure is different, as indexes
is a function of the SparkDeleteFilter
in each case and that is different.
I didn't try too hard to avoid the code repetition here, as I didn't feel it was worth the effort, but let me think a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding a method projectRow
, so that the transform can be written as row -> projectRow(indexes)
.
@pvary I have fixed the implementation so that existing deletes are applied before new deletes are emitted. I have fixed the test case accordingly. (I also renamed |
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java
Show resolved
Hide resolved
This looks good to me. |
@pvary can you please help move this forward then? |
@RussellSpitzer, @aokolnychyi: would it be possible to take a look at this PR? I did my best to review it, but I'm not an expert in this part of the code. |
@flyrain you worked on this area and implemented some of the changelog support; can you please review? |
Will start looking into this PR today and should be able to finish over the weekend. |
@flyrain and @aokolnychyi as you have both expressed interest to review this, I hope you can find time to do so soon! Thank you! |
List<ManifestFile> deleteManifests = snapshot.deleteManifests(table().io()); | ||
|
||
ManifestGroup manifestGroup = | ||
new ManifestGroup(table().io(), dataManifests, deleteManifests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause a substantial performance hit as we will scan all data and delete manifests that match the filter for each changelog snapshot, instead of opening only newly added manifests before. We may have to do that for deletes anyway, but I wonder about data manifests. Let me think a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went snapshot by snapshot as it was the easiest to reason with. For sure we need to consider more than just newly added manifests. For each snapshot, we need to consider existing data files if there are new deletes applicable to them.
I went through some of my old notes, which we should discuss. We have the following tasks right now:
First of all, we need to discuss the expected behavior:
If we want to make resolving equality deletes optional, we need I agree with the idea of iterating snapshot by snapshot. I wonder if we can optimize it, though. AddedRowsScanTask We don’t have to look up historic deletes as the file was just added, except the position deletes added in the same snapshot. For each added data file, it is enough to look up matching position deletes in DeletedDataFileScanTask We have to build DeletedRowsScanTask For each position delete file and each equality delete file (if those must be resolved), find matching data files and historic deletes (if configured to resolve double deletes). We can still build a partition predicate from all delete files that were added in this snapshot. We can use that predicate to prune the set of data manifests for that snapshot. For each newly added delete file, output a task for each affected data file (one delete file can match multiple data files). AddedEqualityDeletesScanTask Output each added equality delete file as a separate task without matching them to data files (if configured). |
What I wrote above is more complicated, we need to understand if that complexity will be justified. I am not sure yet. |
If the table is written by Flink, then for Flink CDC streaming read the lazy solution (not resolving the equality deletes) would be enough. If there is another writer for the table which creates non Flink conform equality delete files, or the user wants a retracting CDC stream when the table was written by an upsert CDC stream, then the equality delete resolution is still needed.
I think that applying delete files is less costly, so I would stick to the theoretically correct solution, and apply previously added delete files to produce the precise CDC log |
I was not aware of this. Is this specifically the case of two concurrent delete operations starting from the same base snapshot, both removing the same set of records, and one winning the commit race, but the other would still succeed in committing? Let's say we are in the former case, then I'd say that for the purpose of CDC, the second commit would contain no changes, so the changelog should not write anything for it. |
Coming back to some questions above.
While it would be useful to stream out ingested CDC log from Flink without applying equality deletes, we shouldn't probably target that for now. If we start outputting equality deletes directly, our changelog may not be accurate. What if we upsert a record that didn't exist? What if an equality delete applied to 10 data records? We can find reasonable behavior but let's skip this for now. Let's always resolve and assign equality deletes so that the changelog is precise.
There is one more data point here. We are about to introduce sync maintenance for position deletes. This means new delete files will include old + new deleted positions. Therefore, we must always resolve historic deletes. Engines would have to diff existing and new deletes. To sum up, I propose sticking to the existing changelog tasks and always resolving historic deletes to produce a correct changelog. I am concerned about full table scans for each incremental snapshot. I'll look into ideas mentioned above to see if we can optimize that. |
@aokolnychyi I agree that we should stick to existing changelog tasks and always resolve historical deletes to produce the changelog. Have you thought of any optimizations for processing the snapshots? How can we move forward with this? |
Fix test. Also rename the Flink scenario tests.
... and adopt some suggestions from review feedback.
dc8d84a
to
2000982
Compare
@aokolnychyi I have rebased the PR on main. How can we move forward with this? |
Currently changelog scan is only supported for a table with no delete files. We implement support for the case when delete files are present in the snapshots to be scanned.