From 7919e8e0efbd0a2af20b44b87656ad401ce79608 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Mon, 28 Aug 2023 15:41:56 -0700 Subject: [PATCH 1/2] Added a timeout on the producer flush call in KafkaMirrorMakerConnectorTask --- .../connectors/kafka/KafkaBasedConnectorConfig.java | 9 +++++++++ .../mirrormaker/KafkaMirrorMakerConnectorTask.java | 11 ++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java index 363b38223..7eeb88a02 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaBasedConnectorConfig.java @@ -41,6 +41,9 @@ public class KafkaBasedConnectorConfig { // how long will the connector wait for a task to shut down before interrupting the task thread private static final String CONFIG_TASK_INTERRUPT_TIMEOUT_MS = "taskKillTimeoutMs"; + // how long will the connector task wait for producer flush before committing safe offsets during hard commit + private static final String CONFIG_HARD_COMMIT_FLUSH_TIMEOUT = "hardCommitFlushTimeout"; + // config value to enable Kafka partition management for KafkaMirrorConnector public static final String ENABLE_PARTITION_ASSIGNMENT = "enablePartitionAssignment"; public static final long DEFAULT_NON_GOOD_STATE_THRESHOLD_MILLIS = Duration.ofMinutes(10).toMillis(); @@ -56,6 +59,7 @@ public class KafkaBasedConnectorConfig { private static final boolean DEFAULT_ENABLE_ADDITIONAL_METRICS = Boolean.TRUE; private static final boolean DEFAULT_INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID = Boolean.FALSE; private static final long DEFAULT_TASK_INTERRUPT_TIMEOUT_MS = Duration.ofSeconds(75).toMillis(); + private static final long DEFAULT_HARD_COMMIT_FLUSH_TIMEOUT_MS = Duration.ofSeconds(10).toMillis(); private static final long POST_TASK_INTERRUPT_TIMEOUT_MS = Duration.ofSeconds(15).toMillis(); private final Properties _consumerProps; @@ -79,6 +83,7 @@ public class KafkaBasedConnectorConfig { private final long _nonGoodStateThresholdMillis; private final boolean _enablePartitionAssignment; private final long _taskInterruptTimeoutMs; + private final long _hardCommitFlushTimeoutMs; // Kafka based pub sub framework uses Long as their offset type, hence instantiating a Long parameterized factory private final CallbackStatusFactory _callbackStatusStrategyFactory; @@ -121,6 +126,7 @@ public KafkaBasedConnectorConfig(Properties properties) { INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID, DEFAULT_INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID); _enablePartitionAssignment = verifiableProperties.getBoolean(ENABLE_PARTITION_ASSIGNMENT, Boolean.FALSE); _taskInterruptTimeoutMs = verifiableProperties.getLong(CONFIG_TASK_INTERRUPT_TIMEOUT_MS, DEFAULT_TASK_INTERRUPT_TIMEOUT_MS); + _hardCommitFlushTimeoutMs = verifiableProperties.getLong(CONFIG_HARD_COMMIT_FLUSH_TIMEOUT, DEFAULT_HARD_COMMIT_FLUSH_TIMEOUT_MS); String callbackStatusStrategyFactoryClass = verifiableProperties.getString(CONFIG_CALLBACK_STATUS_STRATEGY_FACTORY_CLASS, CallbackStatusWithComparableOffsetsFactory.class.getName()); @@ -228,6 +234,9 @@ public long getPostTaskInterruptTimeoutMs() { public long getShutdownExecutorShutdownTimeoutMs() { return _taskInterruptTimeoutMs + POST_TASK_INTERRUPT_TIMEOUT_MS; } + public long getHardCommitFlushTimeoutMs() { + return _hardCommitFlushTimeoutMs; + } public CallbackStatusFactory getCallbackStatusStrategyFactory() { return _callbackStatusStrategyFactory; diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 153a4e11c..b67554219 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -16,6 +16,9 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -124,6 +127,7 @@ public class KafkaMirrorMakerConnectorTask extends AbstractKafkaBasedConnectorTa // among Kafka consumer client metrics for different datastreams. private final boolean _includeDatastreamNameInConsumerClientId; private final String _destinationTopicPrefix; + private final long _hardCommitFlushTimeoutMs; private FlushlessEventProducerHandler _flushlessProducer = null; private boolean _flowControlEnabled = false; private long _maxInFlightMessagesThreshold; @@ -151,6 +155,7 @@ public KafkaMirrorMakerConnectorTask(KafkaBasedConnectorConfig config, Datastrea _isIdentityMirroringEnabled = KafkaMirrorMakerDatastreamMetadata.isIdentityPartitioningEnabled(_datastream); _enablePartitionAssignment = config.getEnablePartitionAssignment(); _includeDatastreamNameInConsumerClientId = config.getIncludeDatastreamNameInConsumerClientId(); + _hardCommitFlushTimeoutMs = config.getHardCommitFlushTimeoutMs(); _destinationTopicPrefix = task.getDatastreams().get(0).getMetadata() .getOrDefault(DatastreamMetadataConstants.DESTINATION_TOPIC_PREFIX, DEFAULT_DESTINATION_TOPIC_PREFIX); _dynamicMetricsManager = DynamicMetricsManager.getInstance(); @@ -406,7 +411,11 @@ protected void maybeCommitOffsets(Consumer consumer, boolean hardCommit) { if (hardCommit) { // hard commit (flush and commit checkpoints) LOG.info("Calling flush on the producer."); try { - _datastreamTask.getEventProducer().flush(); + Future producerFlushFuture = Executors.newSingleThreadExecutor(). + submit(() -> _datastreamTask.getEventProducer().flush()); + producerFlushFuture.get(_hardCommitFlushTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception ex) { + LOG.warn("Producer flush failed with exception: ", ex); } finally { // Flushless mode tracks the successfully received acks, so it is safe to commit offsets even if flush throws // an exception. Commit the safe offsets to reduce send duplication. From bc49ef2990b1e0f44e394def8d5017a30ee48d09 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Mon, 28 Aug 2023 16:34:05 -0700 Subject: [PATCH 2/2] Added shutdown for executor service --- .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index b67554219..e07ffad1f 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -16,6 +16,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -134,6 +135,7 @@ public class KafkaMirrorMakerConnectorTask extends AbstractKafkaBasedConnectorTa private long _minInFlightMessagesThreshold; private int _flowControlTriggerCount = 0; private int _errorOnSendCallbackDuringShutdownCount = 0; + private ExecutorService _producerFlushExecutor; /** * Constructor for KafkaMirrorMakerConnectorTask @@ -159,6 +161,7 @@ public KafkaMirrorMakerConnectorTask(KafkaBasedConnectorConfig config, Datastrea _destinationTopicPrefix = task.getDatastreams().get(0).getMetadata() .getOrDefault(DatastreamMetadataConstants.DESTINATION_TOPIC_PREFIX, DEFAULT_DESTINATION_TOPIC_PREFIX); _dynamicMetricsManager = DynamicMetricsManager.getInstance(); + _producerFlushExecutor = Executors.newSingleThreadExecutor(); if (_enablePartitionAssignment) { LOG.info("Enable Brooklin partition assignment"); @@ -411,8 +414,7 @@ protected void maybeCommitOffsets(Consumer consumer, boolean hardCommit) { if (hardCommit) { // hard commit (flush and commit checkpoints) LOG.info("Calling flush on the producer."); try { - Future producerFlushFuture = Executors.newSingleThreadExecutor(). - submit(() -> _datastreamTask.getEventProducer().flush()); + Future producerFlushFuture = _producerFlushExecutor.submit(() -> _datastreamTask.getEventProducer().flush()); producerFlushFuture.get(_hardCommitFlushTimeoutMs, TimeUnit.MILLISECONDS); } catch (Exception ex) { LOG.warn("Producer flush failed with exception: ", ex); @@ -470,6 +472,7 @@ protected void postShutdownHook() { } } } + _producerFlushExecutor.shutdown(); } /**