diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index cc0232aef282..37210bb226cf 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -78,7 +78,8 @@ class LogLoader( recoveryPointCheckpoint: Long, leaderEpochCache: Option[LeaderEpochFileCache], producerStateManager: ProducerStateManager, - numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int] + numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], + isRemoteLogEnabled: Boolean = false, ) extends Logging { logIdent = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}] " @@ -180,7 +181,11 @@ class LogLoader( } leaderEpochCache.foreach(_.truncateFromEnd(nextOffset)) - val newLogStartOffset = math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset) + val newLogStartOffset = if (isRemoteLogEnabled) { + logStartOffsetCheckpoint + } else { + math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset) + } // The earliest leader epoch may not be flushed during a hard failure. Recover it here. leaderEpochCache.foreach(_.truncateFromStart(logStartOffsetCheckpoint)) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 10c86183b3f0..c289cf25feff 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -187,12 +187,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } def remoteLogEnabled(): Boolean = { - // Remote log is enabled only for non-compact and non-internal topics - remoteStorageSystemEnable && - !(config.compact || Topic.isInternal(topicPartition.topic()) - || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topicPartition.topic()) - || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topicPartition.topic())) && - config.remoteStorageEnable() + UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic()) } /** @@ -1882,6 +1877,17 @@ object UnifiedLog extends Logging { val UnknownOffset = LocalLog.UnknownOffset + def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean, + config: LogConfig, + topic: String): Boolean = { + // Remote log is enabled only for non-compact and non-internal topics + remoteStorageSystemEnable && + !(config.compact || Topic.isInternal(topic) + || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic) + || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) && + config.remoteStorageEnable() + } + def apply(dir: File, config: LogConfig, logStartOffset: Long, @@ -1911,6 +1917,7 @@ object UnifiedLog extends Logging { s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ") val producerStateManager = new ProducerStateManager(topicPartition, dir, maxTransactionTimeoutMs, producerStateManagerConfig, time) + val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic) val offsets = new LogLoader( dir, topicPartition, @@ -1924,7 +1931,8 @@ object UnifiedLog extends Logging { recoveryPoint, leaderEpochCache, producerStateManager, - numRemainingSegments + numRemainingSegments, + isRemoteLogEnabled, ).load() val localLog = new LocalLog(dir, config, segments, offsets.recoveryPoint, offsets.nextOffsetMetadata, scheduler, time, topicPartition, logDirFailureChannel) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 64527c707904..13870904ca6b 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -37,6 +37,8 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochE import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.{any, anyLong} import org.mockito.Mockito.{mock, reset, times, verify, when} @@ -1753,4 +1755,50 @@ class LogLoaderTest { log.close() } + + @ParameterizedTest + @CsvSource(Array("false, 5", "true, 0")) + def testLogStartOffsetWhenRemoteStorageIsEnabled(isRemoteLogEnabled: Boolean, + expectedLogStartOffset: Long): Unit = { + val logDirFailureChannel = null + val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) + val logConfig = LogTestUtils.createLogConfig() + val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager]) + when(stateManager.isEmpty).thenReturn(true) + + val log = createLog(logDir, logConfig) + // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-] + // |---> logStartOffset |---> active segment (empty) + // |---> logEndOffset + for (i <- 0 until 9) { + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) + log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) + log.roll() + } + log.maybeIncrementHighWatermark(new LogOffsetMetadata(9L)) + log.maybeIncrementLogStartOffset(5L, LogStartOffsetIncrementReason.SegmentDeletion) + log.deleteOldSegments() + + val segments = new LogSegments(topicPartition) + log.logSegments.foreach(segment => segments.add(segment)) + assertEquals(5, segments.firstSegment.get.baseOffset) + + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "") + val offsets = new LogLoader( + logDir, + topicPartition, + logConfig, + mockTime.scheduler, + mockTime, + logDirFailureChannel, + hadCleanShutdown = true, + segments, + 0L, + 0L, + leaderEpochCache, + stateManager, + isRemoteLogEnabled = isRemoteLogEnabled + ).load() + assertEquals(expectedLogStartOffset, offsets.logStartOffset) + } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index 593d69cb38c9..99e76293e456 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.tiered.storage; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; @@ -90,15 +92,21 @@ public TieredStorageTestContext(TieredStorageTestHarness harness) { @SuppressWarnings("deprecation") private void initClients() { + // rediscover the new bootstrap-server port incase of broker restarts + ListenerName listenerName = harness.listenerName(); + Properties commonOverrideProps = new Properties(); + commonOverrideProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, harness.bootstrapServers(listenerName)); + // Set a producer linger of 60 seconds, in order to optimistically generate batches of // records with a pre-determined size. Properties producerOverrideProps = new Properties(); producerOverrideProps.put(LINGER_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(60))); - producer = harness.createProducer(ser, ser, producerOverrideProps); + producerOverrideProps.putAll(commonOverrideProps); - consumer = harness.createConsumer(de, de, new Properties(), + producer = harness.createProducer(ser, ser, producerOverrideProps); + consumer = harness.createConsumer(de, de, commonOverrideProps, JavaConverters.asScalaBuffer(Collections.emptyList()).toList()); - admin = harness.createAdminClient(harness.listenerName(), new Properties()); + admin = harness.createAdminClient(listenerName, commonOverrideProps); } private void initContext() { @@ -228,7 +236,11 @@ public Long beginOffset(TopicPartition topicPartition) { public void bounce(int brokerId) { harness.killBroker(brokerId); + boolean allBrokersDead = harness.aliveBrokers().isEmpty(); harness.startBroker(brokerId); + if (allBrokersDead) { + reinitClients(); + } initContext(); } @@ -238,7 +250,11 @@ public void stop(int brokerId) { } public void start(int brokerId) { + boolean allBrokersDead = harness.aliveBrokers().isEmpty(); harness.startBroker(brokerId); + if (allBrokersDead) { + reinitClients(); + } initContext(); } @@ -310,5 +326,16 @@ public void printReport(PrintStream output) { @Override public void close() throws IOException { + // IntegrationTestHarness closes the clients on tearDown, no need to close them explicitly. + } + + private void reinitClients() { + // Broker uses a random port (TestUtils.RandomPort) for the listener. If the initial bootstrap-server config + // becomes invalid, then the clients won't be able to reconnect to the cluster. + // To avoid this, we reinitialize the clients after all the brokers are bounced. + Utils.closeQuietly(producer, "Producer client"); + Utils.closeQuietly(consumer, "Consumer client"); + Utils.closeQuietly(admin, "Admin client"); + initClients(); } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java index b5da2308d14a..ffb8e666d183 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java @@ -24,7 +24,7 @@ import java.util.Map; /** - * Test Cases (A): + * Test Cases: * Elementary offloads and fetches from tiered storage. */ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarness { @@ -46,14 +46,14 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { final Integer p0 = 0; final Integer partitionCount = 1; final Integer replicationFactor = 1; - final Integer maxBatchCountPerSegment = 1; + final Integer oneBatchPerSegment = 1; + final Integer twoBatchPerSegment = 2; final Map> replicaAssignment = null; final boolean enableRemoteLogStorage = true; - final Integer batchSize = 1; builder /* - * (A.1) Create a topic which segments contain only one batch and produce three records + * (1) Create a topic which segments contain only one batch and produce three records * with a batch size of 1. * * The topic and broker are configured so that the two rolled segments are picked from @@ -68,23 +68,23 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { * Log tA-p0 Log tA-p0 * *-------------------* *-------------------* * | base offset = 2 | | base offset = 0 | - * | (k3, v3) | | (k1, v1) | + * | (k2, v2) | | (k0, v0) | * *-------------------* *-------------------* * *-------------------* * | base offset = 1 | - * | (k2, v2) | + * | (k1, v1) | * *-------------------* */ - .createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment, + .createTopic(topicA, partitionCount, replicationFactor, oneBatchPerSegment, replicaAssignment, enableRemoteLogStorage) - .withBatchSize(topicA, p0, batchSize) - .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k1", "v1")) - .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k2", "v2")) - .produce(topicA, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"), - new KeyValueSpec("k3", "v3")) + .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k0", "v0")) + .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k1", "v1")) + .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) + .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), + new KeyValueSpec("k2", "v2")) /* - * (A.2) Similar scenario as above, but with segments of two records. + * (2) Similar scenario as above, but with segments of two records. * * Acceptance: * ----------- @@ -95,28 +95,27 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { * Log tB-p0 Log tB-p0 * *-------------------* *-------------------* * | base offset = 4 | | base offset = 0 | - * | (k5, v5) | | (k1, v1) | - * *-------------------* | (k2, v2) | + * | (k4, v4) | | (k0, v0) | + * *-------------------* | (k1, v1) | * *-------------------* * *-------------------* * | base offset = 2 | + * | (k2, v2) | * | (k3, v3) | - * | (k4, v4) | * *-------------------* */ - .createTopic(topicB, partitionCount, replicationFactor, 2, replicaAssignment, + .createTopic(topicB, partitionCount, replicationFactor, twoBatchPerSegment, replicaAssignment, enableRemoteLogStorage) - .withBatchSize(topicB, p0, batchSize) .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 4L) .expectSegmentToBeOffloaded(broker, topicB, p0, 0, - new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2")) + new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1")) .expectSegmentToBeOffloaded(broker, topicB, p0, 2, - new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4")) - .produce(topicB, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"), - new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"), new KeyValueSpec("k5", "v5")) + new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3")) + .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), + new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4")) /* - * (A.3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption + * (3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption * from a given offset and b) verify that upon broker start, existing remote log segments * metadata are loaded by Kafka and these log segments available. * @@ -124,10 +123,10 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { * ----------- * - For topic A, this offset is defined such that only the second segment is fetched from * the tiered storage. - * - For topic B, only one segment is present in the tiered storage, as asserted by the + * - For topic B, two segments are present in the tiered storage, as asserted by the * previous sub-test-case. */ - // .bounce(broker) + .bounce(broker) .expectFetchFromTieredStorage(broker, topicA, p0, 1) .consume(topicA, p0, 1L, 2, 1) .expectFetchFromTieredStorage(broker, topicB, p0, 2)