-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8394] Restrict multiple bulk inserts into simple bucket COW with disabled Spark native Row #12245
base: master
Are you sure you want to change the base?
Conversation
} else if (isAppendAllowed) { | ||
return Option.of(new AppendHandleFactory()); | ||
} else { | ||
throw new HoodieNotSupportedException("Bulk insert into COW table with bucket index is allowed only once, please, use upsert operation instead"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may overwrite a table use bulk_insert more time? and I think if user need, they can insert into table use bulk_insert more time, because bulk_insert is more effective than other op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. I missed to check overwrite. And there are a lot of failed tests in CI. I will check them, to look at other cases, which I missed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked that overwrite is working properly, and fixed failed tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or better, if we can fix the bulk_insert to be with the same behavior with insert, that would be great: implicitly to change bulk_insert into insert for multiple writes of bulk_insert.
Because we are kind of assuming INSERT
instead of UPSERT
for multiple bulk_inserts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 , need some time to figure out, how RDDSimpleBucketBulkInsertPartitioner
should work. Looks like it's not working properly, and possible there is a new separate bug. But it affects how to made fix in this MR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we create some strange data structures, which I don't understand. For instance, into fileIdPfxList
we place existing file names first:
Lines 88 to 92 in b31c858
locationMap.forEach((k, v) -> { | |
String prefix = FSUtils.getFileIdPfxFromFileId(v.getFileId()); | |
bucketIdToFileIdPrefixMap.put(k, prefix); | |
fileIdPrefixToBucketIndex.put(prefix, fileIdPfxList.size()); | |
fileIdPfxList.add(prefix); |
And add not existing in the end:
Lines 97 to 102 in b31c858
// Generate a file that does not exist | |
for (int i = 0; i < numBuckets; i++) { | |
if (!existsBucketID.contains(i)) { | |
String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(i, isNonBlockingConcurrencyControl); | |
fileIdPrefixToBucketIndex.put(fileIdPrefix, fileIdPfxList.size()); | |
fileIdPfxList.add(fileIdPrefix); |
And later in BucketIndexBulkInsertPartitioner
, I should decide what to do with current record from this data:
Here I've inserted record1
with bucketID = 161
. And at the debug point, I want to insert record2
with bucketID = 138
. But incoming int partitionId = 1
, not 138
, and it came from Spark.
I will try to figure out, how it should work properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be two kinds of "partition id" here, one for Spark partition, another for Hudi bucket id(file group id), the reason to resolve a prefix from the file group id is that: for bucket index, the prefix number string is kind of the bucket id from Hudi insights, the whole file group id is comprised by a number string and an uuid, where here the uuid string is kind of inoperative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of give the wrong info before, I reliazed that we are using bucket index, bucket index is used only for UPSERT
semantics, because the same keys should always be written into one file group, if we allow multiple bulk_inserts here as inserts, there would be a file slice with multiple base files, this may work but usually we do not do that, the suggested coposition is: bulk_insert (one shot for better performance) + multiple upserts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we should throw exception then in the case when the second bulk insert try to write to existed bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For COW table, I would say yes.
""".stripMargin) | ||
checkExceptionContain( | ||
s""" | ||
| insert into $tableName values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need add other operator like overwrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KnightChess , I've added testing of overwrite operator, it works as expected, allows to ovewrite bulk insert multiple times into COW table:
https://github.com/apache/hudi/pull/12245/files#diff-2c06898918dddc19f0b90ecd806d81faf88f30d0ae7bb8be509019b6db9c2accR1753
4fa05ea
to
46449b3
Compare
checkExceptionContain( | ||
s""" | ||
| insert into $tableName values | ||
| (9, 'a3,3', 30, 3000, "2021-01-05") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can test other partitions? @wuwenchi can you help review this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KnightChess , I've added test of other partitions:
https://github.com/apache/hudi/pull/12245/files#diff-2c06898918dddc19f0b90ecd806d81faf88f30d0ae7bb8be509019b6db9c2accR1741
46449b3
to
df5dd11
Compare
2b24806
to
ac34598
Compare
CI is broken on current master. Some test cases are flaky, but the problem with |
ac34598
to
3bf6382
Compare
3bf6382
to
7f60212
Compare
7f60212
to
f53463a
Compare
f53463a
to
d2aa903
Compare
@@ -124,7 +124,6 @@ public void testExtractRecordKeys() { | |||
String[] s3 = KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__"); | |||
Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3); | |||
|
|||
// keys with ':' are not supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to remove it in #12120 after review based changes in expected behavior.
a707447
to
c58b416
Compare
c58b416
to
1d5b28b
Compare
Got CI fail on not affected tests:
Will try to rebase and restart. |
1d5b28b
to
1300ba8
Compare
spark.sql(s"insert into $tableName values (5, 'a1,1', 1, 'main'), (6, 'a6,6', 1, 'main'), (9, 'a3,3', 1, 'side')") | ||
// bucket 1 into 'main', bucket 2 into 'side', the whole insert will fail due to existed bucket 1 in 'main' | ||
val causeRegex = "Multiple bulk insert.* COW.* not supported.*" | ||
checkExceptionMatch(s"insert into $tableName values (9, 'a3,3', 2, 'main'), (13, 'a13,13', 1, 'side')")(causeRegex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 , this test illustrates proposed by this MR behavior. Note that before these changes, we could write silently broken COW table consisted from base and log files.
Also, this exception is thrown only for hoodie.datasource.write.row.writer.enable = 'false'
, which is true
by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the spark native Row
induces discrepancies here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because we have separate processing for this case, and I also added corresponding task HUDI-7757 previously to resolve this hidden branching in the middle of huge HoodieSparkSqlWriter::writeInternal
:
Lines 488 to 493 in f4e810b
// Short-circuit if bulk_insert via row is enabled. | |
// scalastyle:off | |
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) { | |
return bulkInsertAsRow(client, parameters, hoodieConfig, df, mode, tblName, basePath, writerSchema, tableConfig) | |
} | |
// scalastyle:on |
but didn't found quick solution then in the past. And faced it again now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And in the case of ENABLE_ROW_WRITER = true
we use BaseDatasetBulkInsertCommitActionExecutor
instead of SparkBulkInsertCommitActionExecutor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized now, that we have limitations on parquet size, and when, for instance, 0001_fileGroup1
file is big enough, then we will continue to write to another 0001_fileGroup2
file.
So, maybe we could allow to generate new file groups in the case of multiple bulk inserts. @danny0405 , what do you think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked is it possible just to generate new file group for existed bucket, and found that there would be problems in HoodieSimpleBucketIndex::loadBucketIdToFileIdMappingForPartition
:
Lines 88 to 91 in f780c92
throw new HoodieIOException("Find multiple files at partition path=" | |
+ partition + " that belong to the same bucket id = " + bucketId | |
+ ", these instants need to rollback: " + instants.toString() | |
+ ", you can use 'rollback_to_instant' procedure to revert the conflicts."); |
So, looks like support of multiple bulk inserts with
hoodie.datasource.write.row.writer.enable = 'false'
would lead to a lot of changes in readers part.
In the case of hoodie.datasource.write.row.writer.enable = 'true'
, HoodieSimpleBucketIndex
doesn't initialized at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And in the case of
ENABLE_ROW_WRITER = true
we useBaseDatasetBulkInsertCommitActionExecutor
instead ofSparkBulkInsertCommitActionExecutor
.
As i see from documentation and @danny0405 's comment (#12245 (comment)), bulk_insert operation is "for initial loading/bootstrapping a Hudi table at first" (https://hudi.apache.org/docs/write_operations#bulk_insert), so it should be used only on empty tables. If you've already made one bulk_insert - the table is not empty anymore, so all subsequent write operations must be upserts to avoid any potential issues with data consistency.
So my opinion is: for COW tables with Simple Bucket index we should throw exception on attempting to bulk_insert to non-empty table, no matter what ENABLE_ROW_WRITER
value is configured.
After second CI run still got not related to this MR:
Couldn't reproduce this issue locally. And the second problem, also not related to this MR:
|
…h disabled Spark native Row
1300ba8
to
f5b73a2
Compare
Change Logs
In the case of:
hoodie.datasource.write.row.writer.enable = false
,we could do bulk insert into COW table multiple times. And only the first one will produce parquet files, the next one will produce log files, despite the fact that table type is COW. To prevent it, restrict of
AppendHandleFactory
calling for COW table is added.Full discussion is available in #12133.
Impact
No
Risk level (write none, low medium or high below)
Low
Documentation Update
No need
Contributor's checklist