diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 64a3af83e6571..38dd7ad8abb0b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -75,7 +75,6 @@ import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils; import org.apache.kafka.common.utils.AppInfoParser; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -256,7 +255,7 @@ public class KafkaProducer implements Producer { private final ProducerMetadata metadata; private final RecordAccumulator accumulator; private final Sender sender; - private final Thread ioThread; + private final Sender.SenderThread ioThread; private final Compression compression; private final Sensor errors; private final Time time; @@ -454,7 +453,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali this.errors = this.metrics.sensor("errors"); this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; - this.ioThread = new KafkaThread(ioThreadName, this.sender, true); + this.ioThread = new Sender.SenderThread(ioThreadName, this.sender, true); this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); @@ -480,7 +479,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali ProducerInterceptors interceptors, Partitioner partitioner, Time time, - KafkaThread ioThread, + Sender.SenderThread ioThread, Optional clientTelemetryReporter) { this.producerConfig = config; this.time = time; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 614fe562d873e..6739facfc3415 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -234,9 +235,6 @@ private boolean hasPendingTransactionalRequests() { public void run() { log.debug("Starting Kafka producer I/O thread."); - if (transactionManager != null) - transactionManager.setPoisonStateOnInvalidTransition(true); - // main loop, runs until close is called while (running) { try { @@ -1072,4 +1070,10 @@ void recordBatchSplit() { } } + public static class SenderThread extends KafkaThread { + + public SenderThread(final String name, Runnable runnable, boolean daemon) { + super(name, runnable, daemon); + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c78134c72ecf2..b52d5d4836d6c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -120,58 +120,6 @@ public class TransactionManager { private final Set newPartitionsInTransaction; private final Set pendingPartitionsInTransaction; private final Set partitionsInTransaction; - - /** - * During its normal course of operations, the transaction manager transitions through different internal - * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions - * result from actions on one of the following classes of threads: - * - *
    - *
  • Application threads that invokes {@link Producer} API calls
  • - *
  • {@link Sender} thread operations
  • - *
- * - * When an invalid state transition is detected during execution on an application thread, the - * {@link #currentState} is not updated and an {@link IllegalStateException} is thrown. This gives the - * application the opportunity to fix the issue without permanently poisoning the state of the - * transaction manager. The {@link Producer} API calls that perform a state transition include: - * - *
    - *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
  • - *
  • {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
  • - *
  • {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
  • - *
  • {@link Producer#abortTransaction()} calls {@link #beginAbort()} - *
  • - *
  • {@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls - * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} - *
  • - *
  • {@link Producer#send(ProducerRecord)} (and its variants) calls - * {@link #maybeAddPartition(TopicPartition)} and - * {@link #maybeTransitionToErrorState(RuntimeException)} - *
  • - *
- * - *

- * - * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the - * {@link Sender} thread. This includes record batching, network I/O, broker response handlers, etc. If an - * invalid state transition is detected in the {@link Sender} thread, in addition to throwing an - * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its - * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. - * - *

- * - * It's important to prevent possible corruption when the transaction manager has determined that it is in a - * fatal state. Subsequent transaction operations attempted via either the application or the - * {@link Sender} thread should fail. This is achieved when these operations invoke the - * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated - * transactional guarantees are not violated. - * - *

- * - * See KAFKA-14831 for more detail. - */ - private final ThreadLocal shouldPoisonStateOnInvalidTransition; private PendingStateTransition pendingTransition; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. @@ -265,7 +213,6 @@ public TransactionManager(final LogContext logContext, this.newPartitionsInTransaction = new HashSet<>(); this.pendingPartitionsInTransaction = new HashSet<>(); this.partitionsInTransaction = new HashSet<>(); - this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false); this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); this.pendingTxnOffsetCommits = new HashMap<>(); this.partitionsWithUnresolvedSequences = new HashMap<>(); @@ -275,8 +222,61 @@ public TransactionManager(final LogContext logContext, this.apiVersions = apiVersions; } - void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { - shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); + /** + * During its normal course of operations, the transaction manager transitions through different internal + * states (i.e. by updating {@link #currentState}) to one of those defined in {@link State}. These state transitions + * result from actions on one of the following classes of threads: + * + *

    + *
  • Application threads that invokes {@link Producer} API calls
  • + *
  • {@link Sender} thread operations
  • + *
+ * + * When an invalid state transition is detected during execution on an application thread, the + * {@link #currentState} is not updated and an {@link IllegalStateException} is thrown. This gives the + * application the opportunity to fix the issue without permanently poisoning the state of the + * transaction manager. The {@link Producer} API calls that perform a state transition include: + * + *
    + *
  • {@link Producer#initTransactions()} calls {@link #initializeTransactions()}
  • + *
  • {@link Producer#beginTransaction()} calls {@link #beginTransaction()}
  • + *
  • {@link Producer#commitTransaction()}} calls {@link #beginCommit()}
  • + *
  • {@link Producer#abortTransaction()} calls {@link #beginAbort()} + *
  • + *
  • {@link Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} calls + * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} + *
  • + *
  • {@link Producer#send(ProducerRecord)} (and its variants) calls + * {@link #maybeAddPartition(TopicPartition)} and + * {@link #maybeTransitionToErrorState(RuntimeException)} + *
  • + *
+ * + *

+ * + * The {@link Producer} is implemented such that much of its work delegated to and performed asynchronously on the + * {@link Sender} thread. This includes record batching, network I/O, broker response handlers, etc. If an + * invalid state transition is detected in the {@link Sender} thread, in addition to throwing an + * {@link IllegalStateException}, the transaction manager intentionally "poisons" itself by setting its + * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which it cannot recover. + * + *

+ * + * It's important to prevent possible corruption when the transaction manager has determined that it is in a + * fatal state. Subsequent transaction operations attempted via either the application or the + * {@link Sender} thread should fail. This is achieved when these operations invoke the + * {@link #maybeFailWithError()} method, as it causes a {@link KafkaException} to be thrown, ensuring the stated + * transactional guarantees are not violated. + * + *

+ * + * See KAFKA-14831 for more detail. + * + * @return {@code true} to set state to {@link State#FATAL_ERROR} before throwing an exception, + * {@code false} to throw an exception without first changing the state + */ + protected boolean shouldPoisonStateOnInvalidTransition() { + return Thread.currentThread() instanceof Sender.SenderThread; } public synchronized TransactionalRequestResult initializeTransactions() { @@ -1063,7 +1063,7 @@ private void transitionTo(State target, RuntimeException error) { String message = idString + "Invalid transition attempted from state " + currentState.name() + " to state " + target.name(); - if (shouldPoisonStateOnInvalidTransition.get()) { + if (shouldPoisonStateOnInvalidTransition()) { currentState = State.FATAL_ERROR; lastError = new IllegalStateException(message); throw lastError; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index fbb3484a03f7f..12782f5b1242d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -85,7 +85,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -2542,7 +2541,7 @@ private static class KafkaProducerTestContext { private final Map configs; private final Serializer serializer; private final Partitioner partitioner = mock(Partitioner.class); - private final KafkaThread ioThread = mock(KafkaThread.class); + private final Sender.SenderThread senderThread = mock(Sender.SenderThread.class); private final List> interceptors = new ArrayList<>(); private ProducerMetadata metadata = mock(ProducerMetadata.class); private RecordAccumulator accumulator = mock(RecordAccumulator.class); @@ -2623,7 +2622,7 @@ public KafkaProducer newKafkaProducer() { interceptors, partitioner, time, - ioThread, + senderThread, Optional.empty() ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 8b4decfb9598c..628f7c8570e2b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -149,7 +149,7 @@ public class TransactionManagerTest { private RecordAccumulator accumulator = null; private Sender sender = null; - private TransactionManager transactionManager = null; + private TestableTransactionManager transactionManager = null; private Node brokerNode = null; private long finalizedFeaturesEpoch = 0; @@ -188,7 +188,7 @@ private void initializeTransactionManager(Optional transactionalId, bool .setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short) 1)), finalizedFeaturesEpoch)); finalizedFeaturesEpoch += 1; - this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), + this.transactionManager = new TestableTransactionManager(logContext, transactionalId.orElse(null), transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); int batchSize = 16 * 1024; @@ -1038,7 +1038,7 @@ public void testTransactionManagerDisablesV2() { .setMaxVersionLevel((short) 1) .setMinVersionLevel((short) 1)), 0)); - this.transactionManager = new TransactionManager(logContext, transactionalId, + this.transactionManager = new TestableTransactionManager(logContext, transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions); int batchSize = 16 * 1024; @@ -3799,10 +3799,11 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t @Test public void testBackgroundInvalidStateTransitionIsFatal() { + initializeTransactionManager(Optional.of(transactionalId), true); doInitTransactions(); assertTrue(transactionManager.isTransactional()); - transactionManager.setPoisonStateOnInvalidTransition(true); + transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true); // Intentionally perform an operation that will cause an invalid state transition. The detection of this // will result in a poisoning of the transaction manager for all subsequent transactional operations since @@ -4373,4 +4374,31 @@ private void runUntil(Supplier condition) { ProducerTestUtils.runUntil(sender, condition); } + /** + * This subclass exists only to optionally change the default behavior related to poisoning the state + * on invalid state transition attempts. + */ + private static class TestableTransactionManager extends TransactionManager { + + private Optional shouldPoisonStateOnInvalidTransitionOverride; + + public TestableTransactionManager(LogContext logContext, + String transactionalId, + int transactionTimeoutMs, + long retryBackoffMs, + ApiVersions apiVersions) { + super(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions); + this.shouldPoisonStateOnInvalidTransitionOverride = Optional.empty(); + } + + private void setShouldPoisonStateOnInvalidTransitionOverride(boolean override) { + shouldPoisonStateOnInvalidTransitionOverride = Optional.of(override); + } + + @Override + protected boolean shouldPoisonStateOnInvalidTransition() { + // If there's an override, use it, otherwise invoke the default (i.e. super class) logic. + return shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition); + } + } }