Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
The current batch committing algorithm works by preparing record batches, adding them to a FIFO blocking queue and running a separate batch commit thread that takes batches sequentially one-by-once and commits them to storage. The queue size is configurable and once reached the sink is blocked until it clears up.
In high throughput situations the sink may end up forming lots of batches (either because they're too small or there's too much data) and invoking commits faster than the storage can keep up with. This becomes a bottleneck for the loader and is detrimental to the storage too, as there's lots of activity.
Some storages, e.g. Iceberg, support batch merging, i.e. a single batch contains a list of files, so merging batches simply amounts to unioning files from all the batches to a single list and unioning their record ranges. Doing this is a win-win, because there's less queuing in the loader and less commits to the Iceberg table.
Implementation
We define a new trait
AppendableRecordBatch
that can be mixed in toRecordBatch
implementations, users have to then implementappended(next: B): B
to specify the batch merging implementation. As discussed above, anIcebergRecordBatch
simply merges file lists.The
RecordBatchingSinker
then attempts to merge any queued up batches that implementAppendableRecordBatch
before submitting them to storage.To test the implementation we add an extra Iceberg loader integration test that introduces an artificial delay of 1 second to the local Iceberg catalog commit method, a commit queue of length 5 and then rapidly submit 10 records batches, which forces the sink to do merging.
Potential future improvements
The batch committing algorithm runs independently for each Kafka partition group, meaning that if multiple threads of the Iceberg sink are being run in a single process, each one commits batches separately and the merging only happens per thread. Technically this can be optimized further by storing all batches from all threads in a single queue and merging globally. This would even further reduce the number of commits and more importantly reduce contention, as an Iceberg table has to be committed to sequentially, otherwise it gets concurrently modified exceptions and has to retry (which is why there's locking support in the current storage implementation).
However this would require a more extensive refactoring that we leave for future work.
Other changes
Bump all versions.