Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8394] Restrict multiple bulk inserts into simple bucket COW with disabled Spark native Row #12245

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

geserdugarov
Copy link
Contributor

@geserdugarov geserdugarov commented Nov 13, 2024

Change Logs

In the case of:

  • bulk insert operation,
  • hoodie.datasource.write.row.writer.enable = false,
  • simple bucket index,

we could do bulk insert into COW table multiple times. And only the first one will produce parquet files, the next one will produce log files, despite the fact that table type is COW. To prevent it, restrict of AppendHandleFactory calling for COW table is added.

Full discussion is available in #12133.

Impact

No

Risk level (write none, low medium or high below)

Low

Documentation Update

No need

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Nov 13, 2024
} else if (isAppendAllowed) {
return Option.of(new AppendHandleFactory());
} else {
throw new HoodieNotSupportedException("Bulk insert into COW table with bucket index is allowed only once, please, use upsert operation instead");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may overwrite a table use bulk_insert more time? and I think if user need, they can insert into table use bulk_insert more time, because bulk_insert is more effective than other op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I missed to check overwrite. And there are a lot of failed tests in CI. I will check them, to look at other cases, which I missed.

Copy link
Contributor Author

@geserdugarov geserdugarov Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked that overwrite is working properly, and fixed failed tests.

Copy link
Contributor

@danny0405 danny0405 Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or better, if we can fix the bulk_insert to be with the same behavior with insert, that would be great: implicitly to change bulk_insert into insert for multiple writes of bulk_insert.

Because we are kind of assuming INSERT instead of UPSERT for multiple bulk_inserts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 , need some time to figure out, how RDDSimpleBucketBulkInsertPartitioner should work. Looks like it's not working properly, and possible there is a new separate bug. But it affects how to made fix in this MR.

Copy link
Contributor Author

@geserdugarov geserdugarov Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we create some strange data structures, which I don't understand. For instance, into fileIdPfxList we place existing file names first:

locationMap.forEach((k, v) -> {
String prefix = FSUtils.getFileIdPfxFromFileId(v.getFileId());
bucketIdToFileIdPrefixMap.put(k, prefix);
fileIdPrefixToBucketIndex.put(prefix, fileIdPfxList.size());
fileIdPfxList.add(prefix);

And add not existing in the end:

// Generate a file that does not exist
for (int i = 0; i < numBuckets; i++) {
if (!existsBucketID.contains(i)) {
String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(i, isNonBlockingConcurrencyControl);
fileIdPrefixToBucketIndex.put(fileIdPrefix, fileIdPfxList.size());
fileIdPfxList.add(fileIdPrefix);

And later in BucketIndexBulkInsertPartitioner, I should decide what to do with current record from this data:
cow-as-mor-fileIdPfxList

Here I've inserted record1 with bucketID = 161. And at the debug point, I want to insert record2 with bucketID = 138. But incoming int partitionId = 1, not 138, and it came from Spark.

I will try to figure out, how it should work properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be two kinds of "partition id" here, one for Spark partition, another for Hudi bucket id(file group id), the reason to resolve a prefix from the file group id is that: for bucket index, the prefix number string is kind of the bucket id from Hudi insights, the whole file group id is comprised by a number string and an uuid, where here the uuid string is kind of inoperative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of give the wrong info before, I reliazed that we are using bucket index, bucket index is used only for UPSERT semantics, because the same keys should always be written into one file group, if we allow multiple bulk_inserts here as inserts, there would be a file slice with multiple base files, this may work but usually we do not do that, the suggested coposition is: bulk_insert (one shot for better performance) + multiple upserts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we should throw exception then in the case when the second bulk insert try to write to existed bucket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For COW table, I would say yes.

""".stripMargin)
checkExceptionContain(
s"""
| insert into $tableName values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need add other operator like overwrite.

Copy link
Contributor Author

@geserdugarov geserdugarov Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KnightChess , I've added testing of overwrite operator, it works as expected, allows to ovewrite bulk insert multiple times into COW table:
https://github.com/apache/hudi/pull/12245/files#diff-2c06898918dddc19f0b90ecd806d81faf88f30d0ae7bb8be509019b6db9c2accR1753

@geserdugarov geserdugarov force-pushed the master-cow-as-mor branch 2 times, most recently from 4fa05ea to 46449b3 Compare November 14, 2024 08:10
@github-actions github-actions bot added size:M PR with lines of changes in (100, 300] and removed size:S PR with lines of changes in (10, 100] labels Nov 14, 2024
checkExceptionContain(
s"""
| insert into $tableName values
| (9, 'a3,3', 30, 3000, "2021-01-05")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can test other partitions? @wuwenchi can you help review this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geserdugarov geserdugarov changed the title [HUDI-8394] Restrict multiple bulk insert into COW table with bucket index [HUDI-8394] Restrict same partition multiple bulk inserts in append mode into COW with bucket index Nov 15, 2024
@geserdugarov geserdugarov force-pushed the master-cow-as-mor branch 2 times, most recently from 2b24806 to ac34598 Compare November 15, 2024 14:51
@geserdugarov geserdugarov mentioned this pull request Nov 15, 2024
4 tasks
@geserdugarov
Copy link
Contributor Author

geserdugarov commented Nov 15, 2024

CI is broken on current master. Some test cases are flaky, but the problem with testSecondaryIndexWithClusteringAndCleaning looks like reproducible. Checked it here: #12264

@danny0405 danny0405 self-assigned this Nov 18, 2024
@geserdugarov geserdugarov changed the title [HUDI-8394] Restrict same partition multiple bulk inserts in append mode into COW with bucket index [HUDI-8394] Multiple bulk inserts to the same simple bucket for COW behave as insert Nov 21, 2024
@github-actions github-actions bot added size:S PR with lines of changes in (10, 100] and removed size:M PR with lines of changes in (100, 300] labels Nov 21, 2024
@geserdugarov geserdugarov changed the title [HUDI-8394] Multiple bulk inserts to the same simple bucket for COW behave as insert [HUDI-8394] Restrict multiple BULK INSERTs to the same SIMPLE BUCKET for COW Nov 21, 2024
@github-actions github-actions bot added size:M PR with lines of changes in (100, 300] and removed size:S PR with lines of changes in (10, 100] labels Nov 21, 2024
@@ -124,7 +124,6 @@ public void testExtractRecordKeys() {
String[] s3 = KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__");
Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3);

// keys with ':' are not supported
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove it in #12120 after review based changes in expected behavior.

@geserdugarov geserdugarov force-pushed the master-cow-as-mor branch 2 times, most recently from a707447 to c58b416 Compare November 21, 2024 17:52
@geserdugarov
Copy link
Contributor Author

Got CI fail on not affected tests:

[ERROR] Tests run: 8, Failures: 0, Errors: 1, Skipped: 2, Time elapsed: 71.934 s <<< FAILURE! - in org.apache.hudi.functional.TestStructuredStreaming
[ERROR] testStructuredStreamingWithClustering{boolean}[1]  Time elapsed: 11.999 s  <<< ERROR!
java.util.NoSuchElementException: No value present in Option
	at org.apache.hudi.common.util.Option.get(Option.java:93)
	at org.apache.hudi.common.table.HoodieTableMetaClient.lambda$new$0(HoodieTableMetaClient.java:180)
	at org.apache.hudi.common.util.Option.orElseGet(Option.java:153)
	at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:180)
	at org.apache.hudi.common.table.HoodieTableMetaClient.newMetaClient(HoodieTableMetaClient.java:791)
	at org.apache.hudi.common.table.HoodieTableMetaClient.access$100(HoodieTableMetaClient.java:106)
	at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:899)
	at org.apache.hudi.HoodieDataSourceHelpers.allCompletedCommitsCompactions(HoodieDataSourceHelpers.java:126)
	at org.apache.hudi.functional.TestStructuredStreaming.waitTillAtleastNCommits(TestStructuredStreaming.scala:225)
	at org.apache.hudi.functional.TestStructuredStreaming.$anonfun$structuredStreamingForTestClusteringRunner$1(TestStructuredStreaming.scala:409)
- Test Secondary Index With Updates Compaction Clustering Deletes *** FAILED ***
  org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
  ...
  at org.apache.spark.sql.hudi.command.index.TestSecondaryIndex.validateSecondaryIndex(TestSecondaryIndex.scala:370)

Will try to rebase and restart.

spark.sql(s"insert into $tableName values (5, 'a1,1', 1, 'main'), (6, 'a6,6', 1, 'main'), (9, 'a3,3', 1, 'side')")
// bucket 1 into 'main', bucket 2 into 'side', the whole insert will fail due to existed bucket 1 in 'main'
val causeRegex = "Multiple bulk insert.* COW.* not supported.*"
checkExceptionMatch(s"insert into $tableName values (9, 'a3,3', 2, 'main'), (13, 'a13,13', 1, 'side')")(causeRegex)
Copy link
Contributor Author

@geserdugarov geserdugarov Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 , this test illustrates proposed by this MR behavior. Note that before these changes, we could write silently broken COW table consisted from base and log files.
Also, this exception is thrown only for hoodie.datasource.write.row.writer.enable = 'false', which is true by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the spark native Row induces discrepancies here?

Copy link
Contributor Author

@geserdugarov geserdugarov Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because we have separate processing for this case, and I also added corresponding task HUDI-7757 previously to resolve this hidden branching in the middle of huge HoodieSparkSqlWriter::writeInternal:

// Short-circuit if bulk_insert via row is enabled.
// scalastyle:off
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) {
return bulkInsertAsRow(client, parameters, hoodieConfig, df, mode, tblName, basePath, writerSchema, tableConfig)
}
// scalastyle:on

but didn't found quick solution then in the past. And faced it again now.

Copy link
Contributor Author

@geserdugarov geserdugarov Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in the case of ENABLE_ROW_WRITER = true we use BaseDatasetBulkInsertCommitActionExecutor instead of SparkBulkInsertCommitActionExecutor.

Copy link
Contributor Author

@geserdugarov geserdugarov Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized now, that we have limitations on parquet size, and when, for instance, 0001_fileGroup1 file is big enough, then we will continue to write to another 0001_fileGroup2 file.
So, maybe we could allow to generate new file groups in the case of multiple bulk inserts. @danny0405 , what do you think about it?

Copy link
Contributor Author

@geserdugarov geserdugarov Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked is it possible just to generate new file group for existed bucket, and found that there would be problems in HoodieSimpleBucketIndex::loadBucketIdToFileIdMappingForPartition:

throw new HoodieIOException("Find multiple files at partition path="
+ partition + " that belong to the same bucket id = " + bucketId
+ ", these instants need to rollback: " + instants.toString()
+ ", you can use 'rollback_to_instant' procedure to revert the conflicts.");

So, looks like support of multiple bulk inserts with hoodie.datasource.write.row.writer.enable = 'false' would lead to a lot of changes in readers part.

In the case of hoodie.datasource.write.row.writer.enable = 'true', HoodieSimpleBucketIndex doesn't initialized at all.

Copy link
Contributor

@wombatu-kun wombatu-kun Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in the case of ENABLE_ROW_WRITER = true we use BaseDatasetBulkInsertCommitActionExecutor instead of SparkBulkInsertCommitActionExecutor.

As i see from documentation and @danny0405 's comment (#12245 (comment)), bulk_insert operation is "for initial loading/bootstrapping a Hudi table at first" (https://hudi.apache.org/docs/write_operations#bulk_insert), so it should be used only on empty tables. If you've already made one bulk_insert - the table is not empty anymore, so all subsequent write operations must be upserts to avoid any potential issues with data consistency.
So my opinion is: for COW tables with Simple Bucket index we should throw exception on attempting to bulk_insert to non-empty table, no matter what ENABLE_ROW_WRITER value is configured.

@geserdugarov geserdugarov changed the title [HUDI-8394] Restrict multiple BULK INSERTs to the same SIMPLE BUCKET for COW HUDI-8394] Restrict multiple bulk inserts into COW with simple bucket and disabled Spark native Row Nov 22, 2024
@geserdugarov geserdugarov changed the title HUDI-8394] Restrict multiple bulk inserts into COW with simple bucket and disabled Spark native Row [HUDI-8394] Restrict multiple bulk inserts into COW with simple bucket and disabled Spark native Row Nov 22, 2024
@geserdugarov
Copy link
Contributor Author

geserdugarov commented Nov 22, 2024

After second CI run still got not related to this MR:

- Test Secondary Index With Updates Compaction Clustering Deletes *** FAILED ***
  org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
 ...
  at org.apache.spark.sql.hudi.command.index.TestSecondaryIndex.validateSecondaryIndex(TestSecondaryIndex.scala:373)

Couldn't reproduce this issue locally.

And the second problem, also not related to this MR:

Error:  Tests run: 8, Failures: 0, Errors: 1, Skipped: 2, Time elapsed: 64.799 s <<< FAILURE! - in org.apache.hudi.functional.TestStructuredStreaming
Error:  testStructuredStreamingWithClustering{boolean}[2]  Time elapsed: 11.519 s  <<< ERROR!
java.util.NoSuchElementException: No value present in Option
	at org.apache.hudi.common.util.Option.get(Option.java:93)
	at org.apache.hudi.common.table.HoodieTableMetaClient.lambda$new$0(HoodieTableMetaClient.java:180)
	at org.apache.hudi.common.util.Option.orElseGet(Option.java:153)
	at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:180)
	at org.apache.hudi.common.table.HoodieTableMetaClient.newMetaClient(HoodieTableMetaClient.java:794)
	at org.apache.hudi.common.table.HoodieTableMetaClient.access$100(HoodieTableMetaClient.java:106)
	at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:902)
	at org.apache.hudi.HoodieDataSourceHelpers.allCompletedCommitsCompactions(HoodieDataSourceHelpers.java:126)
	at org.apache.hudi.functional.TestStructuredStreaming.waitTillAtleastNCommits(TestStructuredStreaming.scala:225)
	at org.apache.hudi.functional.TestStructuredStreaming.$anonfun$structuredStreamingForTestClusteringRunner$1(TestStructuredStreaming.scala:409)

@geserdugarov geserdugarov changed the title [HUDI-8394] Restrict multiple bulk inserts into COW with simple bucket and disabled Spark native Row [HUDI-8394] Restrict multiple bulk inserts into simple bucket COW with disabled Spark native Row Nov 23, 2024
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300] spark-sql
Projects
Status: 🆕 New
Development

Successfully merging this pull request may close these issues.

5 participants