Skip to content

Commit

Permalink
KAFKA-8813: Refresh log config if it's updated before initialization (a…
Browse files Browse the repository at this point in the history
…pache#7305)

A partition log in initialized in following steps:

1. Fetch log config from ZK
2. Call LogManager.getOrCreateLog which creates the Log object, then
3. Registers the Log object

Step apache#3 enables Configuration update thread to deliver configuration
updates to the log. But if any update arrives between step #1 and apache#3
then that update is missed. It breaks following use case:

1. Create a topic with default configuration, and immediately after that
2. Update the configuration of topic

There is a race condition here and in random cases update made in
second step will get dropped.

This change fixes it by tracking updates arriving between step #1 and apache#3
Once a Partition is done initializing log, it checks if it has missed any
update. If yes, then the configuration is read from ZK again.

Added unit tests to make sure a dirty configuration is refreshed. Tested
on local cluster to make sure that topic configuration and updates are
handled correctly.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
soondenana authored and hachikuji committed Oct 15, 2019
1 parent 9c80a06 commit 176c934
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 40 deletions.
31 changes: 21 additions & 10 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,28 @@ class Partition(val topicPartition: TopicPartition,
}
}

private def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val props = stateStore.fetchTopicConfig()
val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica)
val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
// Visible for testing
private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val fetchLogConfig = () => {
val props = stateStore.fetchTopicConfig()
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
}

logManager.initializingLog(topicPartition)
var maybeLog: Option[Log] = None
try {
val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica)
val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
}
val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
maybeLog = Some(log)
log
} finally {
logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig)
}
val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
log
}

def getReplica(replicaId: Int): Option[Replica] = Option(remoteReplicasMap.get(replicaId))
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,15 @@ class Log(@volatile var dir: File,
0
}

def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = this.config
this.config = newConfig
if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
if (newRecordVersion.value != oldRecordVersion.value)
initializeLeaderEpochCache()
}
}

private def checkIfMemoryMappedBufferClosed(): Unit = {
Expand Down
55 changes: 52 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class LogManager(logDirs: Seq[File],
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir

// This map contains all partitions whose logs are getting loaded and initialized. If log configuration
// of these partitions get updated at the same time, the corresponding entry in this map is set to "true",
// which triggers a config reload after initialization is finished (to get the latest config value).
// See KAFKA-8813 for more detail on the race condition
// Visible for testing
private[log] val partitionsInitializing = new ConcurrentHashMap[TopicPartition, Boolean]().asScala

def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
this._currentDefaultConfig = logConfig
}
Expand Down Expand Up @@ -659,6 +666,48 @@ class LogManager(logDirs: Seq[File],
Option(currentLogs.get(topicPartition))
}

/**
* Method to indicate that logs are getting initialized for the partition passed in as argument.
* This method should always be followed by [[kafka.log.LogManager#finishedInitializingLog]] to indicate that log
* initialization is done.
*/
def initializingLog(topicPartition: TopicPartition): Unit = {
partitionsInitializing(topicPartition) = false
}

/**
* Mark the partition configuration for all partitions that are getting initialized for topic
* as dirty. That will result in reloading of configuration once initialization is done.
*/
def topicConfigUpdated(topic: String): Unit = {
partitionsInitializing.keys.filter(_.topic() == topic).foreach {
topicPartition => partitionsInitializing.replace(topicPartition, false, true)
}
}

/**
* Mark all in progress partitions having dirty configuration if broker configuration is updated.
*/
def brokerConfigUpdated(): Unit = {
partitionsInitializing.keys.foreach {
topicPartition => partitionsInitializing.replace(topicPartition, false, true)
}
}

/**
* Method to indicate that the log initialization for the partition passed in as argument is
* finished. This method should follow a call to [[kafka.log.LogManager#initializingLog]]
*/
def finishedInitializingLog(topicPartition: TopicPartition,
maybeLog: Option[Log],
fetchLogConfig: () => LogConfig): Unit = {
if (partitionsInitializing(topicPartition)) {
maybeLog.foreach(_.updateConfig(fetchLogConfig()))
}

partitionsInitializing -= topicPartition
}

/**
* If the log already exists, just return a copy of the existing log
* Otherwise if isNew=true or if there is no offline log directory, create a log for the given topic and the given partition
Expand Down Expand Up @@ -955,9 +1004,9 @@ class LogManager(logDirs: Seq[File],
def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values

def logsByTopic(topic: String): Seq[Log] = {
(currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, _) =>
topicPartition.topic() == topic
}.map { case (_, log) => log }
(currentLogs.toList ++ futureLogs.toList).collect {
case (topicPartition, log) if topicPartition.topic == topic => log
}
}

/**
Expand Down
19 changes: 13 additions & 6 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,27 @@ trait ConfigHandler {
*/
class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) extends ConfigHandler with Logging {

def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)

val logs = logManager.logsByTopic(topic).toBuffer
private def updateLogConfig(topic: String,
topicConfig: Properties,
configNamesToExclude: Set[String]): Unit = {
logManager.topicConfigUpdated(topic)
val logs = logManager.logsByTopic(topic)
if (logs.nonEmpty) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
topicConfig.asScala.foreach { case (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
logs.foreach(_.updateConfig(topicConfig.asScala.keySet, logConfig))
logs.foreach(_.updateConfig(logConfig))
}
}

def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)

updateLogConfig(topic, topicConfig, configNamesToExclude)

def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = {
if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0) {
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,18 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
// validation, no additional validation is performed.
}

private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = {
logManager.brokerConfigUpdated()
logManager.allLogs.foreach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults
props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)

val logConfig = LogConfig(props.asJava)
log.updateConfig(logConfig)
}
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val currentLogConfig = logManager.currentDefaultConfig
val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
Expand All @@ -626,14 +638,8 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok

logManager.reconfigureDefaultLogConfig(LogConfig(newBrokerDefaults))

logManager.allLogs.foreach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults.asScala
props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
updateLogsConfig(newBrokerDefaults.asScala)

val logConfig = LogConfig(props.asJava)
log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
}
if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) {
server.kafkaController.enableDefaultUncleanLeaderElection()
}
Expand Down
105 changes: 103 additions & 2 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, ListOffsetRequest}
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.mockito.Mockito.{doNothing, mock, when}
import org.mockito.Mockito.{doAnswer, doNothing, mock, spy, times, verify, when}
import org.scalatest.Assertions.assertThrows
import org.mockito.ArgumentMatchers
import org.mockito.invocation.InvocationOnMock
Expand Down Expand Up @@ -1545,12 +1545,113 @@ class PartitionTest {
assertEquals(Set(), Metrics.defaultRegistry().allMetrics().asScala.keySet.filter(_.getType == "Partition"))
}

/**
* Test when log is getting initialized, its config remains untouched after initialization is done.
*/
@Test
def testLogConfigNotDirty(): Unit = {
val spyLogManager = spy(logManager)
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
spyLogManager)

partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)

// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation

// We should get config from ZK only once
verify(stateStore).fetchTopicConfig()
}

/**
* Test when log is getting initialized, its config remains gets reloaded if Topic config gets changed
* before initialization is done.
*/
@Test
def testLogConfigDirtyAsTopicUpdated(): Unit = {
val spyLogManager = spy(logManager)
doAnswer(new Answer[Unit] {
def answer(invocation: InvocationOnMock): Unit = {
logManager.initializingLog(topicPartition)
logManager.topicConfigUpdated(topicPartition.topic())
}
}).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))

val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
spyLogManager)

partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)

// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation

// We should get config from ZK twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(stateStore, times(2)).fetchTopicConfig()
}

/**
* Test when log is getting initialized, its config remains gets reloaded if Broker config gets changed
* before initialization is done.
*/
@Test
def testLogConfigDirtyAsBrokerUpdated(): Unit = {
val spyLogManager = spy(logManager)
doAnswer(new Answer[Unit] {
def answer(invocation: InvocationOnMock): Unit = {
logManager.initializingLog(topicPartition)
logManager.brokerConfigUpdated()
}
}).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))

val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
spyLogManager)

partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)

// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
ArgumentMatchers.any(),
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation

// We should get config from ZK twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(stateStore, times(2)).fetchTopicConfig()
}

private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = {
for (i <- 0 until numRecords) {
val records = MemoryRecords.withRecords(0L, CompressionType.NONE, leaderEpoch,
new SimpleRecord(s"k$i".getBytes, s"v$i".getBytes))
log.appendAsLeader(records, leaderEpoch)
}
}

}
Loading

0 comments on commit 176c934

Please sign in to comment.