Skip to content

AutoMQ Serverless Cornerstone: Partition Reassignment in a Few Seconds

lyx edited this page Jan 17, 2025 · 1 revision

Introduction

Apache Kafka, due to its integrated storage and compute architecture, relies heavily on the completion of extensive data synchronization for partition reassignment. For example, a Kafka partition with a throughput of 100MB/s generates about 8.2TB of data per day. If there is a need to reassign this partition to another Broker, it requires full data replication. Even for nodes with a 1 Gbps bandwidth, this reassignment could take hours, almost negating real-time elasticity capabilities of an Apache Kafka cluster.

import EPPBJA from './XUbww6FcPiqmY1kDGfYcg4jonrh/1.png';

Thanks to the separation of storage and compute in AutoMQ for Kafka, no data needs to be moved during partition reassignment, which makes reducing reassignment time to a few seconds possible.

This article will delve into the principles and source code related to AutoMQ's rapid reassignment capabilities and discuss the application scenarios of these capabilities in the end.

Overview of the AutoMQ Partition Reassignment Process

As illustrated below, taking the reassignment of partition P1 from Broker-0 to Broker-1 as an example, the process is divided into six steps:

  • Step1 Construct partition reassignment command: Controller (ReplicationControlManager: AlterPartitionReassign) When the Kraft Controller receives the partition reassignment command, it creates a corresponding PartitionChangeRecord and commits it to the Kraft Log layer, removes Broker-0 from the leader replica list, and adds Broker-1 to the follower replica list.

  • Step2 Broker synchronizes partition changes: Broker (ElasticReplicaManager: AsyncApplyDelta) Broker-0 synchronizes with the Kraft Log upon detecting changes to partition P1 and initiates the partition shutdown process.

  • Step3 Metadata persistence and partition Stream closure: Broker (ElasticLog: Close) ElasticLog, a LocalLog implementation based on S3Stream, first persists partition metadata to the Meta Stream (including LeaderEpoch, ProducerSnapshot, SegmentList, StreamIds, etc.), then closes both the Meta and Data Streams.

  • Step4 Data upload and Stream closure: Stream (S3Stream: Close) Upon each Stream closure, if there are data not yet uploaded to object storage, a forced upload is triggered. In a stable cluster, this data usually does not exceed a few hundred MB. Given the burst network bandwidth capabilities provided by current cloud providers, this process typically completes in seconds. Once the Stream’s data is uploaded, it can safely report to the Controller to close the Stream and remove partition P1 from Broker-0.

  • Step5 Proactively retrigger leader election: Controller (ReplicationControlManager: ElectLeader) After P1 closure is completed by the Broker, it proactively triggers a leader election. At this point, Broker-1, as the sole replica, is promoted to the leader of P1, entering the partition recovery process.

  • Step6 Partition recovery and data recovery: Broker (ElasticLog: Apply) During partition recovery, it first reports to the Controller to open P1's corresponding Meta Stream, retrieves P1's metadata from object storage based on the Meta Stream, thereby restoring P1's corresponding checkpoints (Leader Epoch/SegmentList, etc.), and then proceeds with data recovery based on P1's shutdown state (whether it was a cleaned shutdown).

import FDUVUN from './XUbww6FcPiqmY1kDGfYcg4jonrh/2.png';

AutoMQ partition reassignment source code analysis.

Next, we delve into a detailed analysis of the six-step process for partition reassignment, using the example of reassigning partition P1 from Broker-0 to Broker-1:

Note: Prior to shutting down a partition, AutoMQ must first report to the Controller to shut down all Streams associated with that partition, putting them into a Closed State. This ensures that the Streams can be reopened in an Opened State when the partition is recovered. This is done to prevent split-brain scenarios (i.e., two Brokers opening the same Stream simultaneously), with the Controller regulating the State and Owner of the Stream.

Step1: Controller builds the partition reassignment command

When the Controller receives the alterPartitionReassignments command, it constructs a PartitionChangeBuilder setting the partition's TargetISR and Replicas to the target [1], but does not elect a Leader immediately, instead opting to delay the election to ensure the partition's corresponding Stream is properly shut down before the election.

Additionally, a leader election timeout is set within the process; if the source Broker fails to trigger the election after a certain period, the Controller will actively trigger the election.

ReplicationControlManagerchangePartitionReassignmentV2 {

    PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
        tp.topicId(),
        tp.partitionId(),
        // no leader election, isAcceptableLeader 直接返回 False,代表不选主
        brokerId -> false,
        featureControl.metadataVersion(),
        getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
    );
    builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
    builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
    // 设置 ISR、Replicas 为 [target.replicas().get(0)]
    builder.setTargetNode(target.replicas().get(0));
    TopicControlInfo topicControlInfo = topics.get(tp.topicId());
    if (topicControlInfo == null) {
        log.warn("unknown topicId[{}]", tp.topicId());
    } else {
        // 选主超时器
        TopicPartition topicPartition = new TopicPartition(topicControlInfo.name, tp.partitionId());
        addPartitionToReElectTimeouts(topicPartition);
    }
    return builder.setDefaultDirProvider(clusterDescriber).build();
}

Step2: Broker synchronizes partition changes

After the Controller updates the Partition's Replicas, Broker-0, through syncing with the Kraft Log, detects the change in partition P1. This partition no longer belongs to Broker-0, hence initiating the process to shut down the partition.

ElasticReplicaManager: asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage) {
    if (!localChanges.deletes.isEmpty) {
      val deletes = localChanges.deletes.asScala
        .map { tp =>
          val isCurrentLeader = Option(delta.image().getTopic(tp.topic()))
            .map(image => image.partitions().get(tp.partition()))
            .exists(partition => partition.leader == config.nodeId)
          val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) && isCurrentLeader
          StopPartition(tp, deleteLocalLog = true, deleteRemoteLog = deleteRemoteLog)
        }
        .toSet
    
      def doPartitionDeletion(): Unit = {
        stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
        deletes.foreach(stopPartition => {
          val opCf = doPartitionDeletionAsyncLocked(stopPartition)
          opCfList.add(opCf)
        })
      }
    
      doPartitionDeletion()
    }    
}

Step3: Broker metadata persistence and partition Stream shutdown

When ReplicasManager calls StopPartition, it cascades the call down to ElasticLog.Close.

ElasticLog is an implementation of LocalLog based on S3Stream, where the partition data and metadata are mapped to S3Stream as follows:

  • Each Segment is mapped to a DataStream

  • The TxnIndex and TimeIndex of a Segment are mapped to Txn Stream and Time Stream respectively

  • The partition metadata (producerSnapshot, LeaderEpoch, Streamids, SegmentList ...) are mapped as key-value pairs into the Meta Stream

import TATKSI from './XUbww6FcPiqmY1kDGfYcg4jonrh/3.png';

ElasticLog first persists the partition metadata to the Meta Stream, then closes both the Meta and Data Streams:

ElasticLog close(): CompletableFuture[Void] = {
    // already flush in UnifiedLog#close, so it's safe to set cleaned shutdown.
    /// Flagged for Clean Shutdown
    partitionMeta.setCleanedShutdown(true)
    partitionMeta.setStartOffset(logStartOffset)
    partitionMeta.setRecoverOffset(recoveryPoint)

    maybeHandleIOException(s"Error while closing $topicPartition in dir ${dir.getParent}") {
        // Persisting Metadata
        CoreUtils.swallow(persistLogMeta(), this)
        CoreUtils.swallow(checkIfMemoryMappedBufferClosed(), this)
        CoreUtils.swallow(segments.close(), this)
        CoreUtils.swallow(persistPartitionMeta(), this)
    }
    info("log(except for streams) closed")
    // Shut down all Streams corresponding to the partition
    closeStreams()
}

Step 4: Data upload and closure of S3Stream

Upon closing each Stream:

  1. Wait for all unfinished requests

  2. If there is still data not uploaded to object storage, a forced upload will be triggered. In a stable cluster, this data usually does not exceed a few hundred MB. With the burst network bandwidth provided by cloud providers, this process typically only takes a few seconds to complete

  3. Once the data upload of the Stream is complete, it can safely report to the Controller to shut down the Stream

S3Stream:Close(){

    // await all pending append/fetch/trim request
    List<CompletableFuture<?>> pendingRequests = new ArrayList<>(pendingAppends);
    if (GlobalSwitch.STRICT) {
        pendingRequests.addAll(pendingFetches);
    }
    pendingRequests.add(lastPendingTrim);
    CompletableFuture<Void> awaitPendingRequestsCf = CompletableFuture.allOf(pendingRequests.toArray(new CompletableFuture[0]));
    CompletableFuture<Void> closeCf = new CompletableFuture<>();
    
    // The Close0 function triggers a forced upload and Stream closure.
    awaitPendingRequestsCf.whenComplete((nil, ex) -> propagate(exec(this::close0, LOGGER, "close"), closeCf));
    
}

private CompletableFuture<Void> close0() {
    return storage.forceUpload(streamId)
        .thenCompose(nil -> streamManager.closeStream(streamId, epoch));
}

Step5: Broker initiates leader election

After the shutdown of P1 from Broker, it triggers a leader election:

ElasticReplicaManager:StopPartitions(partitionsToStop: collection.Set[StopPartition]) {

    partitionsToStop.foreach { stopPartition =>
      val topicPartition = stopPartition.topicPartition
      if (stopPartition.deleteLocalLog) {
        getPartition(topicPartition) match {
          case hostedPartition: HostedPartition.Online =>
            if (allPartitions.remove(topicPartition, hostedPartition)) {
              maybeRemoveTopicMetrics(topicPartition.topic)
              // AutoMQ for Kafka inject start
              if (ElasticLogManager.enabled()) {
                // For elastic stream, partition leader alter is triggered by setting isr/replicas.
                // When broker is not response for the partition, we need to close the partition
                // instead of delete the partition.
                val start = System.currentTimeMillis()
                hostedPartition.partition.close().get()
                info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms, trigger leader election")
                
                // Proactively Trigger Leader Election
                alterPartitionManager.tryElectLeader(topicPartition)
              } else {
                // Logs are not deleted here. They are deleted in a single batch later on.
                // This is done to avoid having to checkpoint for every deletions.
                hostedPartition.partition.delete()
              }
              // AutoMQ for Kafka inject end
            }
    
          case _ =>
        }
        partitionsToDelete += topicPartition
      }

}

In the Controller, Broker-1, being the only replica, is promoted as the leader of P1 and enters the partition recovery process

Step6: Broker partition recovery and data recovery

During Broker partition recovery, it first reports to the Controller to open the Meta Stream corresponding to P1, retrieves the metadata from object storage based on the Meta Stream, thereby restoring the corresponding checkpoint of P1 (Leader Epoch/SegmentList etc.), and then performs data recovery based on the shutdown state of P1 (whether it was a cleaned shutdown).

Code section corresponds to ElasticLog: Apply

  • Step 1: Open Meta Stream
metaStream = if (metaNotExists) {
    val stream = createMetaStream(client, key, replicationFactor, leaderEpoch, logIdent = logIdent)
    info(s"${logIdent}created a new meta stream: stream_id=${stream.streamId()}")
    stream
} else {
    val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
    // open partition meta stream
    val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
        .thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
        .get()
    info(s"${logIdent}opened existing meta stream: stream_id=$metaStreamId")
    stream
}
  • Step 2: Pull Partition MetaInfo, Producer Snapshot, and other partition metadata from MetaStream
// load meta info for this partition
val partitionMetaOpt = metaMap.get(MetaStream.PARTITION_META_KEY).map(m => m.asInstanceOf[ElasticPartitionMeta])
if (partitionMetaOpt.isEmpty) {
    partitionMeta = new ElasticPartitionMeta(0, 0, 0)
    persistMeta(metaStream, MetaKeyValue.of(MetaStream.PARTITION_META_KEY, ElasticPartitionMeta.encode(partitionMeta)))
} else {
    partitionMeta = partitionMetaOpt.get
}
info(s"${logIdent}loaded partition meta: $partitionMeta")

//load producer snapshots for this partition
val producerSnapshotsMeta = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta]).getOrElse(new ElasticPartitionProducerSnapshotsMeta())
val snapshotsMap = new ConcurrentSkipListMap[java.lang.Long, ByteBuffer](producerSnapshotsMeta.getSnapshots)
if (!snapshotsMap.isEmpty) {
    info(s"${logIdent}loaded ${snapshotsMap.size} producer snapshots, offsets(filenames) are ${snapshotsMap.keySet()} ")
} else {
    info(s"${logIdent}loaded no producer snapshots")
}

// load leader epoch checkpoint
val leaderEpochCheckpointMetaOpt = metaMap.get(MetaStream.LEADER_EPOCH_CHECKPOINT_KEY).map(m => m.asInstanceOf[ElasticLeaderEpochCheckpointMeta])
val leaderEpochCheckpointMeta = if (leaderEpochCheckpointMetaOpt.isEmpty) {
    val newMeta = new ElasticLeaderEpochCheckpointMeta(LeaderEpochCheckpointFile.CURRENT_VERSION, List.empty[EpochEntry].asJava)
    // save right now.
    persistMeta(metaStream, MetaKeyValue.of(MetaStream.LEADER_EPOCH_CHECKPOINT_KEY, ByteBuffer.wrap(newMeta.encode())))
    newMeta
} else {
    leaderEpochCheckpointMetaOpt.get
}
info(s"${logIdent}loaded leader epoch checkpoint with ${leaderEpochCheckpointMeta.entries.size} entries")
if (!leaderEpochCheckpointMeta.entries.isEmpty) {
    val lastEntry = leaderEpochCheckpointMeta.entries.get(leaderEpochCheckpointMeta.entries.size - 1)
    info(s"${logIdent}last leaderEpoch entry is: $lastEntry")
}
  • Step 3: Pull SegmentList from MetaStream and restore all Segment states:
val logMeta: ElasticLogMeta = metaMap.get(MetaStream.LOG_META_KEY).map(m => m.asInstanceOf[ElasticLogMeta]).getOrElse(new ElasticLogMeta())
logStreamManager = new ElasticLogStreamManager(logMeta.getStreamMap, client.streamClient(), replicationFactor, leaderEpoch)
val streamSliceManager = new ElasticStreamSliceManager(logStreamManager)

val logSegmentManager = new ElasticLogSegmentManager(metaStream, logStreamManager, logIdent)

// load LogSegments and recover log
val segments = new CachedLogSegments(topicPartition)
// the state of all elastic log segments is restored through the ElasticLogLoader.
val offsets = new ElasticLogLoader(
    logMeta,
    segments,
    logSegmentManager,
    streamSliceManager,
    dir,
    topicPartition,
    config,
    time,
    hadCleanShutdown = partitionMeta.getCleanedShutdown,
    logStartOffsetCheckpoint = partitionMeta.getStartOffset,
    partitionMeta.getRecoverOffset,
    Optional.empty(),
    producerStateManager = producerStateManager,
    numRemainingSegments = numRemainingSegments,
    createAndSaveSegmentFunc = createAndSaveSegment(logSegmentManager, logIdent = logIdent)).load()
info(s"${logIdent}loaded log meta: $logMeta")

Application scenarios of partition reassignment in a few seconds

1) Rapid scaling during peak periods

Kafka operators typically prepare Kafka cluster capacity based on historical experience; however, unexpected popular events and activities can cause a sudden surge in cluster traffic. At this point, it becomes necessary to quickly scale up the cluster and reassign partitions to manage the burst traffic.

In Apache Kafka®, due to the tight coupling of storage and compute, scaling the cluster often requires moving Partition data, a process that consumes a significant amount of time and resources, making it inefficient to scale during peak periods.

In AutoMQ, due to the separation of storage and compute, the scaling process does not involve data migration. This means that when rapid scaling is required during peak times, AutoMQ can respond more flexibly, reducing the duration of the scaling process and its impact on operations.

AutoMQ possesses remarkable scaling capabilities, able to complete a scaling process supporting 1GB of traffic within 5 minutes:

import HTHESF from './XUbww6FcPiqmY1kDGfYcg4jonrh/4.png';

3) Serverless On-Demand Scaling

Another advantage of the AutoMQ architecture is its ability to perform serverless on-demand scaling.

In traditional architectures, scaling often requires manually adjusting the scale of servers or pre-allocating certain resources. However, the separation of storage and compute in AutoMQ's architecture makes the scaling process more flexible and automated. By leveraging container HPA and cloud providers' auto scaling groups, compute resources can be automatically adjusted based on actual traffic needs without the need to consider data migration issues related to storage. This allows the system to better handle fluctuations in traffic, while also reducing the complexity of operations and hardware costs.

import IUVESN from './XUbww6FcPiqmY1kDGfYcg4jonrh/5.png';

AutoMQ Wiki Key Pages

What is automq

Getting started

Architecture

Deployment

Migration

Observability

Integrations

Releases

Benchmarks

Reference

Articles

Clone this wiki locally