Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support changelog scan for table with delete files #10935

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

wypoon
Copy link
Contributor

@wypoon wypoon commented Aug 13, 2024

Currently changelog scan is only supported for a table with no delete files. We implement support for the case when delete files are present in the snapshots to be scanned.

if (addedDeleteFiles.length == 0) {
return new DummyChangelogScanTask(changeOrdinal, snapshotId);
} else {
return new BaseDeletedRowsScanTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this cause a full table scan for every snapshot in the snapshot range we are checking when there is an equality delete file present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that a BaseDeletedRowsScanTask is only created if there are delete files added in that snapshot that may apply to the data file.
Supposing that there is an equality delete file added in that snapshot, whether a full table scan happens for that snapshot depends on the equality deletes and how well DeleteFileIndex.canContainEqDeletesForFile(DataFile, EqualityDeleteFile) is able to filter them. If the equality deletes are for a partition, or even if global but can be excluded using ranges, then a full table scan may not happen. But yes, depending on the equality deletes it could happen.
Unfortunately, I don't see an alternative.

@manuzhang
Copy link
Contributor

@wypoon have you checked out #9888?

@wypoon
Copy link
Contributor Author

wypoon commented Aug 15, 2024

@wypoon have you checked out #9888?

@manuzhang I was not aware of it. Let me take a look. Unfortunately a lot of PRs don't get reviewed. All I knew was that changelog scan for the case where delete files are present was not implemented in main up to now.

@wypoon wypoon changed the title [DRAFT] Support changelog scan for table with delete files Support changelog scan for table with delete files Aug 16, 2024
wypoon added a commit to wypoon/iceberg that referenced this pull request Aug 16, 2024
…9888.

This uses the BaseIncrementalChangeloggScan implementation from apache#9888
and the rest of the changes (including tests) from apache#10935 to test
the BaseIncrementalChangeloggScan implementation.
@wypoon
Copy link
Contributor Author

wypoon commented Aug 16, 2024

@manuzhang thank you for pointing me to your PR, #9888. I have tried testing it and left comments on your PR.

@wypoon
Copy link
Contributor Author

wypoon commented Aug 16, 2024

@aokolnychyi @flyrain @stevenzwu @szehon-ho can you please review this?

@pvary
Copy link
Contributor

pvary commented Aug 17, 2024

@wypoon, @manuzhang: I'm interested in providing Flink streaming CDC reads. That would require a working changelog scan planning in Iceberg core. So I would be happy to help with my reviews here. I'm not an expert of this part of the code yet, so if the PR becomes more complicated, we might have to ask help from folks more experienced here, but I can do a first round of the reviews.

I see that there are multiple PRs for this topic (#9888, #10954). I would suggest to focus the discussion on one specific PR.

There are some tricky scenarios with Flink CDC, we need to handle:


Scenario1:

  • 1st snapshot: Writing a new record to the table primary key 1 (PK1), and value 1 (V1)
    • Creates data file 1 (DF1) - with PK1-V1
  • 2nd snapshot: The record is updated with PK1 with value V2
    • Creates an equality delete 1 (ED1) - with PK1
    • Creates a new data file 2 (DF2) - with PK1-V2
  • 3rd snapshot: The record is updated with PK1 with value V3
    • Creates an equality delete 2 (ED2) - with PK1
    • Creates a new data file 3 (DF3) - with PK1-V3

When the changelog scan reads the 3rd snapshot, it should consider:

  • Delete reads:
    • DF1 - omit records deleted by ED1, emit records deleted by ED2
    • DF2 - emit records deleted by ED2
  • Data read:
    • DF3 - emit all records

Notice that for DF1 we should not emit records which are deleted by previous deletes. So I am concerned about your // not used comment here 😄


Scenario2:

  • 1st snapshot: Writing a new record to the table primary key 1 (PK1), and value 1 (V1)
    • Creates data file 1 (DF1) - with PK1-V1
  • 2nd snapshot: The record is updated with PK1 with value V2, but later updated again with PK1 with value V3
    • Creates an equality delete 1 (ED1) - with PK1
    • Creates a new data file 2 (DF2) - with PK1-V2
    • Creates a positional delete 1 (PS1) - for the DF2-PK1
    • Here we have 2 possibilities:
      • Adds a new line to DF2 with PK1-V3 - if the data file target file size is not reached yet
      • Creates a new data file 3 (DF3) - with PK1-V3 - if the data file is already rolled over

When changelog scan the 2nd snapshot, it should consider:

  • Delete reads:
    • DF1 - emit records deleted by ED1 - the emitted record is: D(PK1-V1)
    • DF2 - emit records deleted by PD1 - the emitted record is: D(PK1-V2)
  • Data read:
    • DF2 - omit records deleted by PS1 - the emitted record is: I(PK1-V2)
    • DF3 - emit all records - the emitted record is: I(PK1-V3)

Notice that the order of the records is important, and not trivial to order if the files are read in the distributed way.

CC: @dramaticlly

@wypoon
Copy link
Contributor Author

wypoon commented Aug 17, 2024

@pvary thank you for your interest and for the Flink scenarios, which is very helpful as I am unfamiliar with Flink.

Regarding #9888, please read my comments there. I put up #10954 only as a reference for @manuzhang so he can see the tests I added which fail with the implementation of BaseIncrementalChangelogScan in #9888. #10954 is not really for consideration or review.

Next week I'll look into the scenarios you listed and see what gaps there are in my implementation and add tests as necessary. Regarding DeletedRowsScanTask, when @aokolnychyi added the API, he figured that it was necessary to know what deletes existed before the snapshot and what deletes are added in the snapshot. I'll have to think through the equality delete case, but for the positional delete case, I believe that it is not necessary to know the existing deletes (which is the reason for my // not used comment). For the positional delete case, I do not believe deleting the same position in the same data file again can happen, so added deletes should always be new positions. Thus, we only need to scan the data file and emit the rows that are deleted by the added deletes (which I do by using the _pos metadata column and the PositionDeleteIndex a DeleteFilter constructs for the data file). The _pos metadata column is automatically added to the schema if there are any position delete files to be applied to the data file, and for a DeletedRowsScanTask, the DeleteFilter is constructed with the added delete files (so those are the position delete files to be applied).
I admit that I hadn't thought through equality deletes carefully. Just to clarify, in your scenario1, for example, does ED1 contain PK=PK1? In other words, PK1 is the value of the primary key in the table, right? and by V1 you simply mean the values of the other columns? And then ED2 again contains PK=PK1? so that you can update the other columns of the same row with V2, right?

@pvary
Copy link
Contributor

pvary commented Aug 18, 2024

For the positional delete case, I do not believe deleting the same position in the same data file again can happen

I agree, that this should be the case. My only concern is, that until now, it didn't cause any issues. We might not thought about it and didn't clarify this in the specification - so if we have a hard requirement here, we might want to clarify it in the spec too.

Just to clarify, in your scenario1, for example, does ED1 contain PK=PK1? In other words, PK1 is the value of the primary key in the table, right? and by V1 you simply mean the values of the other columns? And then ED2 again contains PK=PK1? so that you can update the other columns of the same row with V2, right?

Yes, undestand correctly. That's what I was trying to describe

@wypoon
Copy link
Contributor Author

wypoon commented Aug 20, 2024

@pvary I have added testFlinkScenario1 and testFlinkScenario2 to TestChangelogReader based on your two scenarios. Please check the expected results. (I will rename the tests later with more descriptive names.)
For scenario 1, I agree with you on what the changelog should emit. If DF1 is in snapshot 3, then we should emit the row with PK1 being deleted by ED2. (The row is deleted by ED1 too, but we should only emit the row once, not twice, for snapshot 3).
For scenario 2, I think that when a row in a data file is deleted by a positional delete in the same commit, that row should neither be shown as inserted nor as deleted. This is where I think we disagree. (IIUC, you expect to see it as deleted but not as inserted. To me, that would be inconsistent.) This part of scenario 2 is actually already tested by testAddingAndDeletingInSameCommit.
If you agree with my analysis, then my implementation does handle at least your two scenarios correctly.

// for snapshot 2, we should record only one delete for id=2 (from the equality delete)
// and one insert (the final value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one way to present this data. And I would be totally fine with this result. For Flink this is as good as it can get.

Other writers might write delete records into the positional delete records (like df1.path(), 0L - to remove "1,a") along with the ones which remove the the new records. If those are handled correctly, then we are golden here.

@pvary
Copy link
Contributor

pvary commented Aug 21, 2024

@pvary I have added testFlinkScenario1 and testFlinkScenario2 to TestChangelogReader based on your two scenarios. Please check the expected results. (I will rename the tests later with more descriptive names.)

These are very great tests. Thanks for implementing them!

For scenario 1, I agree with you on what the changelog should emit. If DF1 is in snapshot 3, then we should emit the row with PK1 being deleted by ED2. (The row is deleted by ED1 too, but we should only emit the row once, not twice, for snapshot 3).

Do I understand correctly that this is not yet the situation with the current code?

For scenario 2, I think that when a row in a data file is deleted by a positional delete in the same commit, that row should neither be shown as inserted nor as deleted. This is where I think we disagree. (IIUC, you expect to see it as deleted but not as inserted. To me, that would be inconsistent.) This part of scenario 2 is actually already tested by testAddingAndDeletingInSameCommit.

I think we agree here. I'm perfectly fine if we can make sure that the added and immediately removed records are not emitted during the incremental scan read.

If you agree with my analysis, then my implementation does handle at least your two scenarios correctly.

I think the output for snapshot3 in scenario1 is not correct. I have left a comment there.

@flyrain
Copy link
Contributor

flyrain commented Aug 21, 2024

cc @dramaticlly, I think you might be interested.

@wypoon
Copy link
Contributor Author

wypoon commented Aug 21, 2024

@pvary thanks for reviewing the tests!
Just a quick response for now. For scenario 1, the behavior is as I thought you described, yet in your comment on the test code, you indicated that it is incorrect, so I am confused. To be honest, I thought about it some more yesterday and I actually think it is incorrect too. So I sent an email to the Iceberg dev list (please see that email), asking for clarification.
The current behavior is case (b) in the email, which is what I thought you expected. I now think the behavior should be case (a). Please add your thoughts to the thread in the dev list too.

@pvary
Copy link
Contributor

pvary commented Aug 21, 2024

@pvary thanks for reviewing the tests! Just a quick response for now. For scenario 1, the behavior is as I thought you described, yet in your comment on the test code, you indicated that it is incorrect, so I am confused. To be honest, I thought about it some more yesterday and I actually think it is incorrect too. So I sent an email to the Iceberg dev list (please see that email), asking for clarification. The current behavior is case (b) in the email, which is what I thought you expected. I now think the behavior should be case (a). Please add your thoughts to the thread in the dev list too.

Sorry for the confusion @wypoon!
I have tried to describe the (a) case in my description with this sentence:

Notice that for DF1 we should not emit records which are deleted by previous deletes.

Anyways, added my comments to the thread as well

@wypoon
Copy link
Contributor Author

wypoon commented Aug 21, 2024

@pvary thanks for posting on the dev list thread!
In your description of scenario 1, you wrote:

When the changelog scan reads the 3rd snapshot, it should consider:

Delete reads:
DF1 - omit records deleted by ED1, emit records deleted by ED2
DF2 - emit records deleted by ED2
Data read:
DF3 - emit all records

This corresponds to case (b) in the thread. But now I understand that you must have made a mistake and did not really mean the

DF1 - omit records deleted by ED1, emit records deleted by ED2

and think that (a) is the correct behavior.
The current behavior is (b), but I'll work on changing the implementation to handle this equality delete case so that the behavior is (a).

@github-actions github-actions bot added the build label Aug 22, 2024
gradle.properties Outdated Show resolved Hide resolved
Comment on lines +220 to +234
// Predicate to test whether a row has not been deleted by equality deletes
public Predicate<T> eqDeletedRowFilter() {
return isEqDeleted().negate();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used in one place, ColumnarBatchReader.ColumnarBatchLoader::applyEqDelete(ColumnarBatch).
I propose deprecating this and changing the call to the negation of isEqDeleted() instead. isEqDeleted() is more useful. I'll do that in a separate PR.

Comment on lines 154 to 164
return CloseableIterable.transform(
deletes.filter(rows(task, deletes.requiredSchema())),
row -> {
InternalRow expectedRow = new GenericInternalRow(columns.length);

for (int i = 0; i < columns.length; i++) {
expectedRow.update(i, row.get(indexes[i], sparkColumnTypes[i]));
}

return expectedRow;
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When DeleteFilter::filter is called, if there are deletes to be applied, the requiredSchema could have additional columns beyond the requested schema (e.g., _pos is added if there are positional deletes to be applied). Thus the InternalRow could have more columns than the requested schema, and we need to exclude the additional columns and only keep the ones for the requested schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I realized this as well when reading from RowDataReader, looks like if there's position delete present it will always add pos metadata column as required in

if (!posDeletes.isEmpty()) {
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
}
, as well as equality delete id if equality delete are present.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speak of which, i saw this update to InternalRow has been repeated here and below for deletedDataFileScanTask, do you know if we can put it into a method instead?

Copy link
Contributor Author

@wypoon wypoon Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks the same, but it is not the same transform as it is a closure, and indexes in each closure is different, as indexes is a function of the SparkDeleteFilter in each case and that is different.
I didn't try too hard to avoid the code repetition here, as I didn't feel it was worth the effort, but let me think a bit more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding a method projectRow, so that the transform can be written as row -> projectRow(indexes).

@wypoon
Copy link
Contributor Author

wypoon commented Aug 22, 2024

@pvary I have fixed the implementation so that existing deletes are applied before new deletes are emitted. I have fixed the test case accordingly. (I also renamed testFlinkScenario1 and testFlinkScenario2.) I think things work as you expect now.

@pvary
Copy link
Contributor

pvary commented Sep 23, 2024

This looks good to me.
@dramaticlly: Any more comments, before we try to involve the guys who are more experienced with the core/spark parts?

@wypoon
Copy link
Contributor Author

wypoon commented Sep 25, 2024

@pvary can you please help move this forward then?

@pvary
Copy link
Contributor

pvary commented Sep 25, 2024

@RussellSpitzer, @aokolnychyi: would it be possible to take a look at this PR? I did my best to review it, but I'm not an expert in this part of the code.
Thanks, Peter

@wypoon
Copy link
Contributor Author

wypoon commented Sep 25, 2024

@flyrain you worked on this area and implemented some of the changelog support; can you please review?

@aokolnychyi
Copy link
Contributor

Will start looking into this PR today and should be able to finish over the weekend.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 16, 2024

@flyrain and @aokolnychyi as you have both expressed interest to review this, I hope you can find time to do so soon! Thank you!

List<ManifestFile> deleteManifests = snapshot.deleteManifests(table().io());

ManifestGroup manifestGroup =
new ManifestGroup(table().io(), dataManifests, deleteManifests)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause a substantial performance hit as we will scan all data and delete manifests that match the filter for each changelog snapshot, instead of opening only newly added manifests before. We may have to do that for deletes anyway, but I wonder about data manifests. Let me think a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went snapshot by snapshot as it was the easiest to reason with. For sure we need to consider more than just newly added manifests. For each snapshot, we need to consider existing data files if there are new deletes applicable to them.

@aokolnychyi
Copy link
Contributor

I went through some of my old notes, which we should discuss.

We have the following tasks right now:

  • AddedRowsScanTask (added data file + deletes that happened within the same snapshot).
  • DeletedDataFileScanTask (removed data file + deletes that applied to it before it was removed).
  • DeletedRowsScanTask (data file that was affected by a delete file (if we resolve equality deletes or we had position deletes) + historic deletes that were there before + new deletes added in this snapshot).

First of all, we need to discuss the expected behavior:

  • Do we want to resolve equality deletes and map them into data files? Or should we add a new task and output the content of equality delete files? I'd say we should support both options (without resolving by default?).
  • What if a snapshot adds a new delete file for the already deleted record? That can happen both with position and equality deletes. For instance, if two position-based delete operations remove the same set of records, both of them will succeed. Producing a precise CDC log would require reading all historic deletes, which may be unnecessary expensive in some cases. I'd say this should be configurable as well.

If we want to make resolving equality deletes optional, we need AddedEqualityDeletesScanTask. We discussed this for Flink CDC use cases. It is going to be costly at planning and query time to apply equality deletes to data files to get removed records. As long as the equality delete persists the entire row or the caller is OK with only equality columns, it should be fine to output the content of equality delete files as is.

I agree with the idea of iterating snapshot by snapshot. I wonder if we can optimize it, though.

AddedRowsScanTask

We don’t have to look up historic deletes as the file was just added, except the position deletes added in the same snapshot. For each added data file, it is enough to look up matching position deletes in DeleteFileIndex built from delete manifests added in this snapshot.

DeletedDataFileScanTask

We have to build DeleteFileIndex that includes all historic deletes for removed data files. We can optimize this step by reading delete manifests only for the affected partitions. We can create a PartitionMap predicate from new data files and use it while reading delete manifests. We can supplement that with a predicate on referencedDataFile in the future. Historic delete files that are not affecting deleted data file partitions can be discarded.

DeletedRowsScanTask

For each position delete file and each equality delete file (if those must be resolved), find matching data files and historic deletes (if configured to resolve double deletes). We can still build a partition predicate from all delete files that were added in this snapshot. We can use that predicate to prune the set of data manifests for that snapshot. For each newly added delete file, output a task for each affected data file (one delete file can match multiple data files).

AddedEqualityDeletesScanTask

Output each added equality delete file as a separate task without matching them to data files (if configured).

@aokolnychyi
Copy link
Contributor

What I wrote above is more complicated, we need to understand if that complexity will be justified. I am not sure yet.

@pvary
Copy link
Contributor

pvary commented Oct 17, 2024

First of all, we need to discuss the expected behavior:

  • Do we want to resolve equality deletes and map them into data files? Or should we add a new task and output the content of equality delete files? I'd say we should support both options (without resolving by default?).

If the table is written by Flink, then for Flink CDC streaming read the lazy solution (not resolving the equality deletes) would be enough. If there is another writer for the table which creates non Flink conform equality delete files, or the user wants a retracting CDC stream when the table was written by an upsert CDC stream, then the equality delete resolution is still needed.

  • What if a snapshot adds a new delete file for the already deleted record? That can happen both with position and equality deletes. For instance, if two position-based delete operations remove the same set of records, both of them will succeed. Producing a precise CDC log would require reading all historic deletes, which may be unnecessary expensive in some cases. I'd say this should be configurable as well.

I think that applying delete files is less costly, so I would stick to the theoretically correct solution, and apply previously added delete files to produce the precise CDC log

@wypoon
Copy link
Contributor Author

wypoon commented Oct 21, 2024

For instance, if two position-based delete operations remove the same set of records, both of them will succeed.

I was not aware of this. Is this specifically the case of two concurrent delete operations starting from the same base snapshot, both removing the same set of records, and one winning the commit race, but the other would still succeed in committing?
Or is this more general, where a delete that deletes already deleted records is allowed to do so?

Let's say we are in the former case, then I'd say that for the purpose of CDC, the second commit would contain no changes, so the changelog should not write anything for it.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Oct 22, 2024

Coming back to some questions above.

Do we want to resolve equality deletes and map them into data files? Or should we add a new task and output the content of equality delete files?

While it would be useful to stream out ingested CDC log from Flink without applying equality deletes, we shouldn't probably target that for now. If we start outputting equality deletes directly, our changelog may not be accurate. What if we upsert a record that didn't exist? What if an equality delete applied to 10 data records? We can find reasonable behavior but let's skip this for now. Let's always resolve and assign equality deletes so that the changelog is precise.

What if a snapshot adds a new delete file for the already deleted record? That can happen both with position and equality deletes.

There is one more data point here. We are about to introduce sync maintenance for position deletes. This means new delete files will include old + new deleted positions. Therefore, we must always resolve historic deletes. Engines would have to diff existing and new deletes.

To sum up, I propose sticking to the existing changelog tasks and always resolving historic deletes to produce a correct changelog. I am concerned about full table scans for each incremental snapshot. I'll look into ideas mentioned above to see if we can optimize that.

@wypoon
Copy link
Contributor Author

wypoon commented Nov 18, 2024

@aokolnychyi I agree that we should stick to existing changelog tasks and always resolve historical deletes to produce the changelog. Have you thought of any optimizations for processing the snapshots? How can we move forward with this?

@wypoon
Copy link
Contributor Author

wypoon commented Nov 25, 2024

@aokolnychyi I have rebased the PR on main. How can we move forward with this?
If we agree that the behavior is correct, how about we at least have something that works and optimize it later?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants