-
Notifications
You must be signed in to change notification settings - Fork 231
AutoMQ Serverless Cornerstone: Partition Reassignment in a Few Seconds
Apache Kafka, due to its integrated storage and compute architecture, relies heavily on the completion of extensive data synchronization for partition reassignment. For example, a Kafka partition with a throughput of 100MB/s generates about 8.2TB of data per day. If there is a need to reassign this partition to another Broker, it requires full data replication. Even for nodes with a 1 Gbps bandwidth, this reassignment could take hours, almost negating real-time elasticity capabilities of an Apache Kafka cluster.
import EPPBJA from './XUbww6FcPiqmY1kDGfYcg4jonrh/1.png';
Thanks to the separation of storage and compute in AutoMQ for Kafka, no data needs to be moved during partition reassignment, which makes reducing reassignment time to a few seconds possible.
This article will delve into the principles and source code related to AutoMQ's rapid reassignment capabilities and discuss the application scenarios of these capabilities in the end.
As illustrated below, taking the reassignment of partition P1 from Broker-0 to Broker-1 as an example, the process is divided into six steps:
-
Step1 Construct partition reassignment command: Controller (ReplicationControlManager: AlterPartitionReassign) When the Kraft Controller receives the partition reassignment command, it creates a corresponding PartitionChangeRecord and commits it to the Kraft Log layer, removes Broker-0 from the leader replica list, and adds Broker-1 to the follower replica list.
-
Step2 Broker synchronizes partition changes: Broker (ElasticReplicaManager: AsyncApplyDelta) Broker-0 synchronizes with the Kraft Log upon detecting changes to partition P1 and initiates the partition shutdown process.
-
Step3 Metadata persistence and partition Stream closure: Broker (ElasticLog: Close) ElasticLog, a LocalLog implementation based on S3Stream, first persists partition metadata to the Meta Stream (including LeaderEpoch, ProducerSnapshot, SegmentList, StreamIds, etc.), then closes both the Meta and Data Streams.
-
Step4 Data upload and Stream closure: Stream (S3Stream: Close) Upon each Stream closure, if there are data not yet uploaded to object storage, a forced upload is triggered. In a stable cluster, this data usually does not exceed a few hundred MB. Given the burst network bandwidth capabilities provided by current cloud providers, this process typically completes in seconds. Once the Stream’s data is uploaded, it can safely report to the Controller to close the Stream and remove partition P1 from Broker-0.
-
Step5 Proactively retrigger leader election: Controller (ReplicationControlManager: ElectLeader) After P1 closure is completed by the Broker, it proactively triggers a leader election. At this point, Broker-1, as the sole replica, is promoted to the leader of P1, entering the partition recovery process.
-
Step6 Partition recovery and data recovery: Broker (ElasticLog: Apply) During partition recovery, it first reports to the Controller to open P1's corresponding Meta Stream, retrieves P1's metadata from object storage based on the Meta Stream, thereby restoring P1's corresponding checkpoints (Leader Epoch/SegmentList, etc.), and then proceeds with data recovery based on P1's shutdown state (whether it was a cleaned shutdown).
import FDUVUN from './XUbww6FcPiqmY1kDGfYcg4jonrh/2.png';
Next, we delve into a detailed analysis of the six-step process for partition reassignment, using the example of reassigning partition P1 from Broker-0 to Broker-1:
Note: Prior to shutting down a partition, AutoMQ must first report to the Controller to shut down all Streams associated with that partition, putting them into a Closed State. This ensures that the Streams can be reopened in an Opened State when the partition is recovered. This is done to prevent split-brain scenarios (i.e., two Brokers opening the same Stream simultaneously), with the Controller regulating the State and Owner of the Stream.
When the Controller receives the alterPartitionReassignments command, it constructs a PartitionChangeBuilder setting the partition's TargetISR and Replicas to the target [1], but does not elect a Leader immediately, instead opting to delay the election to ensure the partition's corresponding Stream is properly shut down before the election.
Additionally, a leader election timeout is set within the process; if the source Broker fails to trigger the election after a certain period, the Controller will actively trigger the election.
ReplicationControlManager:changePartitionReassignmentV2 {
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
// no leader election, isAcceptableLeader 直接返回 False,代表不选主
brokerId -> false,
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
// 设置 ISR、Replicas 为 [target.replicas().get(0)]
builder.setTargetNode(target.replicas().get(0));
TopicControlInfo topicControlInfo = topics.get(tp.topicId());
if (topicControlInfo == null) {
log.warn("unknown topicId[{}]", tp.topicId());
} else {
// 选主超时器
TopicPartition topicPartition = new TopicPartition(topicControlInfo.name, tp.partitionId());
addPartitionToReElectTimeouts(topicPartition);
}
return builder.setDefaultDirProvider(clusterDescriber).build();
}
After the Controller updates the Partition's Replicas, Broker-0, through syncing with the Kraft Log, detects the change in partition P1. This partition no longer belongs to Broker-0, hence initiating the process to shut down the partition.
ElasticReplicaManager: asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage) {
if (!localChanges.deletes.isEmpty) {
val deletes = localChanges.deletes.asScala
.map { tp =>
val isCurrentLeader = Option(delta.image().getTopic(tp.topic()))
.map(image => image.partitions().get(tp.partition()))
.exists(partition => partition.leader == config.nodeId)
val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) && isCurrentLeader
StopPartition(tp, deleteLocalLog = true, deleteRemoteLog = deleteRemoteLog)
}
.toSet
def doPartitionDeletion(): Unit = {
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
deletes.foreach(stopPartition => {
val opCf = doPartitionDeletionAsyncLocked(stopPartition)
opCfList.add(opCf)
})
}
doPartitionDeletion()
}
}
When ReplicasManager calls StopPartition, it cascades the call down to ElasticLog.Close.
ElasticLog is an implementation of LocalLog based on S3Stream, where the partition data and metadata are mapped to S3Stream as follows:
-
Each Segment is mapped to a DataStream
-
The TxnIndex and TimeIndex of a Segment are mapped to Txn Stream and Time Stream respectively
-
The partition metadata (producerSnapshot, LeaderEpoch, Streamids, SegmentList ...) are mapped as key-value pairs into the Meta Stream
import TATKSI from './XUbww6FcPiqmY1kDGfYcg4jonrh/3.png';
ElasticLog first persists the partition metadata to the Meta Stream, then closes both the Meta and Data Streams:
ElasticLog close(): CompletableFuture[Void] = {
// already flush in UnifiedLog#close, so it's safe to set cleaned shutdown.
/// Flagged for Clean Shutdown
partitionMeta.setCleanedShutdown(true)
partitionMeta.setStartOffset(logStartOffset)
partitionMeta.setRecoverOffset(recoveryPoint)
maybeHandleIOException(s"Error while closing $topicPartition in dir ${dir.getParent}") {
// Persisting Metadata
CoreUtils.swallow(persistLogMeta(), this)
CoreUtils.swallow(checkIfMemoryMappedBufferClosed(), this)
CoreUtils.swallow(segments.close(), this)
CoreUtils.swallow(persistPartitionMeta(), this)
}
info("log(except for streams) closed")
// Shut down all Streams corresponding to the partition
closeStreams()
}
Upon closing each Stream:
-
Wait for all unfinished requests
-
If there is still data not uploaded to object storage, a forced upload will be triggered. In a stable cluster, this data usually does not exceed a few hundred MB. With the burst network bandwidth provided by cloud providers, this process typically only takes a few seconds to complete
-
Once the data upload of the Stream is complete, it can safely report to the Controller to shut down the Stream
S3Stream:Close(){
// await all pending append/fetch/trim request
List<CompletableFuture<?>> pendingRequests = new ArrayList<>(pendingAppends);
if (GlobalSwitch.STRICT) {
pendingRequests.addAll(pendingFetches);
}
pendingRequests.add(lastPendingTrim);
CompletableFuture<Void> awaitPendingRequestsCf = CompletableFuture.allOf(pendingRequests.toArray(new CompletableFuture[0]));
CompletableFuture<Void> closeCf = new CompletableFuture<>();
// The Close0 function triggers a forced upload and Stream closure.
awaitPendingRequestsCf.whenComplete((nil, ex) -> propagate(exec(this::close0, LOGGER, "close"), closeCf));
}
private CompletableFuture<Void> close0() {
return storage.forceUpload(streamId)
.thenCompose(nil -> streamManager.closeStream(streamId, epoch));
}
After the shutdown of P1 from Broker, it triggers a leader election:
ElasticReplicaManager:StopPartitions(partitionsToStop: collection.Set[StopPartition]) {
partitionsToStop.foreach { stopPartition =>
val topicPartition = stopPartition.topicPartition
if (stopPartition.deleteLocalLog) {
getPartition(topicPartition) match {
case hostedPartition: HostedPartition.Online =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
maybeRemoveTopicMetrics(topicPartition.topic)
// AutoMQ for Kafka inject start
if (ElasticLogManager.enabled()) {
// For elastic stream, partition leader alter is triggered by setting isr/replicas.
// When broker is not response for the partition, we need to close the partition
// instead of delete the partition.
val start = System.currentTimeMillis()
hostedPartition.partition.close().get()
info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms, trigger leader election")
// Proactively Trigger Leader Election
alterPartitionManager.tryElectLeader(topicPartition)
} else {
// Logs are not deleted here. They are deleted in a single batch later on.
// This is done to avoid having to checkpoint for every deletions.
hostedPartition.partition.delete()
}
// AutoMQ for Kafka inject end
}
case _ =>
}
partitionsToDelete += topicPartition
}
}
In the Controller, Broker-1, being the only replica, is promoted as the leader of P1 and enters the partition recovery process
During Broker partition recovery, it first reports to the Controller to open the Meta Stream corresponding to P1, retrieves the metadata from object storage based on the Meta Stream, thereby restoring the corresponding checkpoint of P1 (Leader Epoch/SegmentList etc.), and then performs data recovery based on the shutdown state of P1 (whether it was a cleaned shutdown).
Code section corresponds to ElasticLog: Apply
- Step 1: Open Meta Stream
metaStream = if (metaNotExists) {
val stream = createMetaStream(client, key, replicationFactor, leaderEpoch, logIdent = logIdent)
info(s"${logIdent}created a new meta stream: stream_id=${stream.streamId()}")
stream
} else {
val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
// open partition meta stream
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
.thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
.get()
info(s"${logIdent}opened existing meta stream: stream_id=$metaStreamId")
stream
}
- Step 2: Pull Partition MetaInfo, Producer Snapshot, and other partition metadata from MetaStream
// load meta info for this partition
val partitionMetaOpt = metaMap.get(MetaStream.PARTITION_META_KEY).map(m => m.asInstanceOf[ElasticPartitionMeta])
if (partitionMetaOpt.isEmpty) {
partitionMeta = new ElasticPartitionMeta(0, 0, 0)
persistMeta(metaStream, MetaKeyValue.of(MetaStream.PARTITION_META_KEY, ElasticPartitionMeta.encode(partitionMeta)))
} else {
partitionMeta = partitionMetaOpt.get
}
info(s"${logIdent}loaded partition meta: $partitionMeta")
//load producer snapshots for this partition
val producerSnapshotsMeta = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta]).getOrElse(new ElasticPartitionProducerSnapshotsMeta())
val snapshotsMap = new ConcurrentSkipListMap[java.lang.Long, ByteBuffer](producerSnapshotsMeta.getSnapshots)
if (!snapshotsMap.isEmpty) {
info(s"${logIdent}loaded ${snapshotsMap.size} producer snapshots, offsets(filenames) are ${snapshotsMap.keySet()} ")
} else {
info(s"${logIdent}loaded no producer snapshots")
}
// load leader epoch checkpoint
val leaderEpochCheckpointMetaOpt = metaMap.get(MetaStream.LEADER_EPOCH_CHECKPOINT_KEY).map(m => m.asInstanceOf[ElasticLeaderEpochCheckpointMeta])
val leaderEpochCheckpointMeta = if (leaderEpochCheckpointMetaOpt.isEmpty) {
val newMeta = new ElasticLeaderEpochCheckpointMeta(LeaderEpochCheckpointFile.CURRENT_VERSION, List.empty[EpochEntry].asJava)
// save right now.
persistMeta(metaStream, MetaKeyValue.of(MetaStream.LEADER_EPOCH_CHECKPOINT_KEY, ByteBuffer.wrap(newMeta.encode())))
newMeta
} else {
leaderEpochCheckpointMetaOpt.get
}
info(s"${logIdent}loaded leader epoch checkpoint with ${leaderEpochCheckpointMeta.entries.size} entries")
if (!leaderEpochCheckpointMeta.entries.isEmpty) {
val lastEntry = leaderEpochCheckpointMeta.entries.get(leaderEpochCheckpointMeta.entries.size - 1)
info(s"${logIdent}last leaderEpoch entry is: $lastEntry")
}
- Step 3: Pull SegmentList from MetaStream and restore all Segment states:
val logMeta: ElasticLogMeta = metaMap.get(MetaStream.LOG_META_KEY).map(m => m.asInstanceOf[ElasticLogMeta]).getOrElse(new ElasticLogMeta())
logStreamManager = new ElasticLogStreamManager(logMeta.getStreamMap, client.streamClient(), replicationFactor, leaderEpoch)
val streamSliceManager = new ElasticStreamSliceManager(logStreamManager)
val logSegmentManager = new ElasticLogSegmentManager(metaStream, logStreamManager, logIdent)
// load LogSegments and recover log
val segments = new CachedLogSegments(topicPartition)
// the state of all elastic log segments is restored through the ElasticLogLoader.
val offsets = new ElasticLogLoader(
logMeta,
segments,
logSegmentManager,
streamSliceManager,
dir,
topicPartition,
config,
time,
hadCleanShutdown = partitionMeta.getCleanedShutdown,
logStartOffsetCheckpoint = partitionMeta.getStartOffset,
partitionMeta.getRecoverOffset,
Optional.empty(),
producerStateManager = producerStateManager,
numRemainingSegments = numRemainingSegments,
createAndSaveSegmentFunc = createAndSaveSegment(logSegmentManager, logIdent = logIdent)).load()
info(s"${logIdent}loaded log meta: $logMeta")
Kafka operators typically prepare Kafka cluster capacity based on historical experience; however, unexpected popular events and activities can cause a sudden surge in cluster traffic. At this point, it becomes necessary to quickly scale up the cluster and reassign partitions to manage the burst traffic.
In Apache Kafka®, due to the tight coupling of storage and compute, scaling the cluster often requires moving Partition data, a process that consumes a significant amount of time and resources, making it inefficient to scale during peak periods.
In AutoMQ, due to the separation of storage and compute, the scaling process does not involve data migration. This means that when rapid scaling is required during peak times, AutoMQ can respond more flexibly, reducing the duration of the scaling process and its impact on operations.
AutoMQ possesses remarkable scaling capabilities, able to complete a scaling process supporting 1GB of traffic within 5 minutes:
import HTHESF from './XUbww6FcPiqmY1kDGfYcg4jonrh/4.png';
Another advantage of the AutoMQ architecture is its ability to perform serverless on-demand scaling.
In traditional architectures, scaling often requires manually adjusting the scale of servers or pre-allocating certain resources. However, the separation of storage and compute in AutoMQ's architecture makes the scaling process more flexible and automated. By leveraging container HPA and cloud providers' auto scaling groups, compute resources can be automatically adjusted based on actual traffic needs without the need to consider data migration issues related to storage. This allows the system to better handle fluctuations in traffic, while also reducing the complexity of operations and hardware costs.
import IUVESN from './XUbww6FcPiqmY1kDGfYcg4jonrh/5.png';
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
-
S3stream shared streaming storage
-
Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
-
Data analysis
-
Object storage
-
Kafka ui
-
Observability
-
Data integration