-
Notifications
You must be signed in to change notification settings - Fork 231
AutoMQ: The Secret of Efficient Data Organization in Object Storage: Compaction
AutoMQ, an object storage-based messaging system, accumulates data across partitions in memory on the write path, simultaneously persisting to EBS. Upon reaching a predefined batch size, the data batch is uploaded to object storage. This method ensures the API call cost and object storage file count are correlated solely with throughput, not growing linearly with partition numbers, as depicted in the subsequent figure:
In the process of batch data upload to object storage, it may generate two types of objects (see "How AutoMQ Achieves 100% Protocol Compatibility with Apache Kafka" [3] for partition-stream mapping details). For those unfamiliar, consider that a partition's data corresponds to a stream:
-
Stream Set Object (abbreviated as SSO): An object that contains consecutive data segments from multiple streams
-
Stream Object (abbreviated as SO): An object that contains consecutive data segments from a single stream
When transmitting data, if it exceeds a predefined length threshold from the same stream, it directly uploads as a Single Object (SO). The residual data from various partitions is consolidated into one Single Stream Object (SSO), arranged in ascending order of stream ID, as illustrated in the subsequent diagram:
Similar to the LSM-Tree Compaction mechanism [4], AutoMQ's Compaction is primarily used for data cleanup, reducing the amount of metadata, and increasing data cohesion to improve read performance.
-
Data Cleanup: Compaction is used to delete expired partition data.
-
Reducing Metadata Amount: By compacting multiple small objects into larger ones, the amount of metadata that needs to be maintained is effectively reduced.
-
Enhancing Read Performance: In Apache Kafka®'s file structure, retrieving historical data from a specific partition merely involves identifying the associated segment file. Yet, with AutoMQ's batch write approach, if numerous partitions exist, a single SSO might contain a minimal data segment from a single partition. This scenario requires making API calls to multiple SSOs when consuming a historical data segment for a specific partition, thereby elevating call costs and potentially impacting cold read throughput. Leveraging Compaction, data from the same partition can be consolidated onto the least possible number of objects, subsequently boosting consumption performance.
AutoMQ implements two levels of Compaction:
-
SSO Compaction: Compact multiple SSOs into no more than one SSO and multiple SOs.
-
SO Compaction: Combine multiple SOs belonging to the same Stream into a larger SO.
Due to space limitations, this article will focus on SSO Compaction.
At the beginning of SSO Compaction, all SSOs generated by the current node are retrieved, and the index files of each SSO are read to parse the Streams and corresponding data ranges within each object. During this process, expired data segments of each Stream are directly ignored. The compaction process of the three SSOs shown in the figure below will be taken as an example (note that the length of the color blocks in the figure only indicates the length of the corresponding data segments; the actual data segments are not read at this step):
After retrieving the indexes of each SSO, they are sorted by Stream Id in ascending order and by the data offset within the same Stream in ascending order:
Once sorted, consecutive data segments of the same Stream that exceed the split threshold will be split into individual SOs, and the remaining data segments will form new SSOs:
Since Compaction is a periodic task (default period is 20 minutes), for a high-traffic online cluster, the data volume of SSOs covered in each Compaction may reach hundreds of GB or more. It is almost impossible to pull all this data locally for Compaction in one go. Therefore, AutoMQ will divide the Compaction into multiple iterations based on the maximum memory space available for the Compaction task. After each iteration is completed, the memory data is cleared before starting the next iteration, thereby achieving large-scale data Compaction within a controllable memory space. Still using the figure above as an example, assuming the memory limit for Compaction is 150, this Compaction will be completed in two iterations:
In the first iteration, two data segments from S0 will be uploaded as the first part of SSO-3. The first two data segments from S1 (30-60, 60-120) will be merged into one object (SO-0) and uploaded, while the data segments from S2 will be truncated into two SOs due to only partially satisfying the memory limit of the first iteration. The front part that meets the memory limit in the first iteration (S2 400-435) will be uploaded.
In the second iteration, the remaining data segments from S2 that were previously truncated (435-500) will be uploaded as a separate SO, and the remaining data segments from S2 will be uploaded as the second part of SSO-3.
After the iteration plan is formulated, actual read/write requests can be initiated. To minimize the API call cost of object storage, the data segments to be read in each iteration will be grouped by the object they belong to before the iteration starts. Since the compaction iteration order is inherently sorted by Stream Id -> Offset, adjacent data segments in an SSO can be merged into a single API read. Once the data segments in an iteration are read locally and assembled, the upload can be triggered. After all objects to be produced in an iteration are uploaded, the data segments read into memory for that iteration can be cleared to make room for the next iteration. Taking the two iterations mentioned above as examples:
- First iteration:
-
Asynchronously initiate reads to three SSOs:
-
SSO-0 performs a Batch Read to read the S0 (0-20) and S1 (30-60) data segments
-
SSO-1 performs a Batch Read to read the S0 (20-25) and S1 (60-120) data segments
-
SSO-2 performs a Batch Read to read the S2 (400-435) data segment
-
-
After reading S0 (0-20) and S0 (20-25), they are uploaded as the first part of SSO-3
-
After reading S1 (30-60) and S1 (60-120), they are uploaded through Multi-Part Upload to complete SO-0's upload
-
After reading S2 (400-435), it is uploaded through PutObject to complete SO-1's upload
- Second iteration:
-
Initiate asynchronous reads for two SSOs:
-
SSO-0 performs a Batch Read to retrieve the S3 (210-230) segment.
-
SSO-2 performs a Batch Read to retrieve the S2 (435-500) and S3 (230-270) segments.
-
-
Once the S2 (435-500) read is complete, perform a Multi-Part Upload to finish the SSO-2 upload.
-
Once the S3 (210-230) and S3 (230-270) reads are complete, upload them as the final part of SSO-3.
When all iterations are completed, all objects generated during this compaction are stored in the object storage. At this point, the Broker node will initiate a commit request to the Controller, marking the compacted objects for deletion and replacing the metadata with the newly generated object indexes. If the compaction process is terminated due to node offline issues or other anomalies, the objects generated during this compaction will be cleaned up after the commit timeout period.
This article outlines how AutoMQ accomplishes large-scale SSO object compaction within confined memory resources. Beyond the scope of this discussion, AutoMQ also supports features like Force Split, hierarchical throttling for compaction, and SO compaction leveraging UploadPartCopy. Detailed coverage of these features is omitted due to space limitations. We invite interested readers to delve into the AutoMQ code repository for a comprehensive understanding.
[1] KIP-405: Kafka Tiered Storage: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
[2] S3Stream: https://github.com/AutoMQ/automq/tree/main/s3stream
[3] How AutoMQ Achieves 100% Protocol Compatibility with Apache Kafka: https://www.automq.com/blog/how-automq-makes-apache-kafka-100-protocol-compatible
[4] Log-structured merge-tree: https://en.wikipedia.org/wiki/Log-structured_merge-tree
[5] AWS S3 UploadPartCopy https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
-
S3stream shared streaming storage
-
Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
-
Data analysis
-
Object storage
-
Kafka ui
-
Observability
-
Data integration