Skip to content

Commit

Permalink
KAFKA-14912:Add a dynamic config for remote index cache size (apache#…
Browse files Browse the repository at this point in the history
…14381)

Reviewers: Luke Chen <[email protected]>, Satish Duggana <[email protected]>, Kamal Chandraprakash<[email protected]>, Divij Vaidya <[email protected]>, Subhrodip Mohanta <[email protected]>
  • Loading branch information
hudeqi authored Oct 8, 2023
1 parent 354c9ca commit 1c3eb43
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 43 deletions.
6 changes: 5 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,

remoteLogStorageManager = createRemoteStorageManager();
remoteLogMetadataManager = createRemoteLogMetadataManager();
indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir);
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());

Expand All @@ -211,6 +211,10 @@ public Double value() {
);
}

public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
}

private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
remoteStorageReaderThreadPool.removeMetrics();
Expand Down
52 changes: 51 additions & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ object DynamicBrokerConfig {
Set(KafkaConfig.MetricReporterClassesProp) ++
DynamicListenerConfig.ReconfigurableConfigs ++
SocketServer.ReconfigurableConfigs ++
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala ++
DynamicRemoteLogConfig.ReconfigurableConfigs

private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp, KafkaConfig.NumNetworkThreadsProp)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
Expand Down Expand Up @@ -271,6 +272,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
addBrokerReconfigurable(new DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
addBrokerReconfigurable(new DynamicRemoteLogConfig(kafkaServer))
}

/**
Expand Down Expand Up @@ -1129,3 +1131,51 @@ class DynamicProducerStateManagerConfig(val producerStateManagerConfig: Producer
override def reconfigurableConfigs: Set[String] = ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala

}

class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging {
override def reconfigurableConfigs: Set[String] = {
DynamicRemoteLogConfig.ReconfigurableConfigs
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
if (reconfigurableConfigs.contains(k)) {
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
val newValue = v.asInstanceOf[Long]
val oldValue = getValue(server.config, k)
if (newValue != oldValue && newValue <= 0) {
val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v"
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
}
}
}
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
if (oldValue != newValue) {
val remoteLogManager = server.remoteLogManagerOpt
if (remoteLogManager.nonEmpty) {
remoteLogManager.get.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
}
}
}

private def getValue(config: KafkaConfig, name: String): Long = {
name match {
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n")
}
}
}

object DynamicRemoteLogConfig {
val ReconfigurableConfigs = Set(
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
)
}
107 changes: 85 additions & 22 deletions core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import scala.collection.mutable

class RemoteIndexCacheTest {
private val defaultRemoteIndexCacheSizeBytes = 1024 * 1024L
private val logger: Logger = LoggerFactory.getLogger(classOf[RemoteIndexCacheTest])
private val time = new MockTime()
private val brokerId = 1
Expand All @@ -64,24 +65,9 @@ class RemoteIndexCacheTest {
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))

cache = new RemoteIndexCache(rsm, tpDir.toString)
cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString)

when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
mockRsmFetchIndex(rsm)
}

@AfterEach
Expand Down Expand Up @@ -183,9 +169,10 @@ class RemoteIndexCacheTest {

@Test
def testCacheEntryExpiry(): Unit = {
val estimateEntryBytesSize = estimateOneEntryBytesSize()
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
cache = new RemoteIndexCache(2, rsm, tpDir.toString)
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)

Expand Down Expand Up @@ -230,7 +217,7 @@ class RemoteIndexCacheTest {
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")

cache = new RemoteIndexCache(2, rsm, tpDir.toString)
cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)

Expand Down Expand Up @@ -430,9 +417,10 @@ class RemoteIndexCacheTest {

@Test
def testReloadCacheAfterClose(): Unit = {
val estimateEntryBytesSize = estimateOneEntryBytesSize()
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
cache = new RemoteIndexCache(2, rsm, tpDir.toString)
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)

Expand Down Expand Up @@ -466,7 +454,7 @@ class RemoteIndexCacheTest {
cache.close()

// Reload the cache from the disk and check the cache size is same as earlier
val reloadedCache = new RemoteIndexCache(2, rsm, tpDir.toString)
val reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
assertEquals(2, reloadedCache.internalCache.asMap().size())
reloadedCache.close()

Expand Down Expand Up @@ -524,6 +512,48 @@ class RemoteIndexCacheTest {
}
}

@Test
def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {

def getIndexFileFromRemoteCacheDir(suffix: String) = {
Files.walk(cache.cacheDir())
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}

val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)

assertCacheSize(0)
// getIndex for first time will call rsm#fetchIndex
val cacheEntry = cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)

cache.resizeCacheSize(1L)

// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after resizing cache.")
TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
"Failed to cleanup cache entry after resizing cache.")

// verify no index files on remote cache dir
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}")

assertTrue(cache.internalCache().estimatedSize() == 0)
}

@Test
def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
// create Corrupt Offset Index File
Expand Down Expand Up @@ -616,12 +646,45 @@ class RemoteIndexCacheTest {
}
}

private def estimateOneEntryBytesSize(): Long = {
val tp = new TopicPartition("estimate-entry-bytes-size", 0)
val tpId = new TopicIdPartition(Uuid.randomUuid(), tp)
val tpDir = new File(logDir, tpId.toString)
Files.createDirectory(tpDir.toPath)
val rsm = mock(classOf[RemoteStorageManager])
mockRsmFetchIndex(rsm)
val cache = new RemoteIndexCache(2L, rsm, tpDir.toString)
val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
val entry = cache.getIndexEntry(metadataList.head)
val entrySizeInBytes = entry.entrySizeBytes()
Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry size")
entrySizeInBytes
}

private def mockRsmFetchIndex(rsm: RemoteStorageManager): Unit = {
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
}

private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
pw.write("Hello, world")
// The size of the string written in the file is 12 bytes,
// but it should be multiple of Offset Index EntrySIZE which is equal to 8.
pw.close()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicReference
import kafka.controller.KafkaController
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.utils.TestUtils
import kafka.zk.KafkaZkClient
Expand Down Expand Up @@ -787,6 +788,31 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}

@Test
def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
origProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "2")

val config = KafkaConfig(origProps)
val serverMock = Mockito.mock(classOf[KafkaBroker])
val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))

Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)

config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))

val props = new Properties()

props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "4")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(4L, config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)

Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
}

def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
retentionMs: Long,
logLocalRetentionBytes: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
atLeast(0),
LOW,
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
.define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
LONG,
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
atLeast(1),
Expand Down
Loading

0 comments on commit 1c3eb43

Please sign in to comment.