Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record batch merging #52

Merged
merged 2 commits into from
Dec 2, 2024
Merged

Record batch merging #52

merged 2 commits into from
Dec 2, 2024

Conversation

sauliusvl
Copy link
Contributor

Motivation

The current batch committing algorithm works by preparing record batches, adding them to a FIFO blocking queue and running a separate batch commit thread that takes batches sequentially one-by-once and commits them to storage. The queue size is configurable and once reached the sink is blocked until it clears up.

In high throughput situations the sink may end up forming lots of batches (either because they're too small or there's too much data) and invoking commits faster than the storage can keep up with. This becomes a bottleneck for the loader and is detrimental to the storage too, as there's lots of activity.

Some storages, e.g. Iceberg, support batch merging, i.e. a single batch contains a list of files, so merging batches simply amounts to unioning files from all the batches to a single list and unioning their record ranges. Doing this is a win-win, because there's less queuing in the loader and less commits to the Iceberg table.

Implementation

We define a new trait AppendableRecordBatch that can be mixed in to RecordBatch implementations, users have to then implement appended(next: B): B to specify the batch merging implementation. As discussed above, an IcebergRecordBatch simply merges file lists.

The RecordBatchingSinker then attempts to merge any queued up batches that implement AppendableRecordBatch before submitting them to storage.

To test the implementation we add an extra Iceberg loader integration test that introduces an artificial delay of 1 second to the local Iceberg catalog commit method, a commit queue of length 5 and then rapidly submit 10 records batches, which forces the sink to do merging.

Potential future improvements

The batch committing algorithm runs independently for each Kafka partition group, meaning that if multiple threads of the Iceberg sink are being run in a single process, each one commits batches separately and the merging only happens per thread. Technically this can be optimized further by storing all batches from all threads in a single queue and merging globally. This would even further reduce the number of commits and more importantly reduce contention, as an Iceberg table has to be committed to sequentially, otherwise it gets concurrently modified exceptions and has to retry (which is why there's locking support in the current storage implementation).

However this would require a more extensive refactoring that we leave for future work.

Other changes

Bump all versions.

@shivam247 shivam247 merged commit 4a851c1 into adform:master Dec 2, 2024
1 check passed
@sauliusvl sauliusvl deleted the batch-merging branch December 2, 2024 11:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants