Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a timeout on the producer flush call in KafkaMirrorMakerConnectorTask #957

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class KafkaBasedConnectorConfig {
// how long will the connector wait for a task to shut down before interrupting the task thread
private static final String CONFIG_TASK_INTERRUPT_TIMEOUT_MS = "taskKillTimeoutMs";

// how long will the connector task wait for producer flush before committing safe offsets during hard commit
private static final String CONFIG_HARD_COMMIT_FLUSH_TIMEOUT = "hardCommitFlushTimeout";

// config value to enable Kafka partition management for KafkaMirrorConnector
public static final String ENABLE_PARTITION_ASSIGNMENT = "enablePartitionAssignment";
public static final long DEFAULT_NON_GOOD_STATE_THRESHOLD_MILLIS = Duration.ofMinutes(10).toMillis();
Expand All @@ -56,6 +59,7 @@ public class KafkaBasedConnectorConfig {
private static final boolean DEFAULT_ENABLE_ADDITIONAL_METRICS = Boolean.TRUE;
private static final boolean DEFAULT_INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID = Boolean.FALSE;
private static final long DEFAULT_TASK_INTERRUPT_TIMEOUT_MS = Duration.ofSeconds(75).toMillis();
private static final long DEFAULT_HARD_COMMIT_FLUSH_TIMEOUT_MS = Duration.ofSeconds(10).toMillis();
private static final long POST_TASK_INTERRUPT_TIMEOUT_MS = Duration.ofSeconds(15).toMillis();

private final Properties _consumerProps;
Expand All @@ -79,6 +83,7 @@ public class KafkaBasedConnectorConfig {
private final long _nonGoodStateThresholdMillis;
private final boolean _enablePartitionAssignment;
private final long _taskInterruptTimeoutMs;
private final long _hardCommitFlushTimeoutMs;

// Kafka based pub sub framework uses Long as their offset type, hence instantiating a Long parameterized factory
private final CallbackStatusFactory<Long> _callbackStatusStrategyFactory;
Expand Down Expand Up @@ -121,6 +126,7 @@ public KafkaBasedConnectorConfig(Properties properties) {
INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID, DEFAULT_INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID);
_enablePartitionAssignment = verifiableProperties.getBoolean(ENABLE_PARTITION_ASSIGNMENT, Boolean.FALSE);
_taskInterruptTimeoutMs = verifiableProperties.getLong(CONFIG_TASK_INTERRUPT_TIMEOUT_MS, DEFAULT_TASK_INTERRUPT_TIMEOUT_MS);
_hardCommitFlushTimeoutMs = verifiableProperties.getLong(CONFIG_HARD_COMMIT_FLUSH_TIMEOUT, DEFAULT_HARD_COMMIT_FLUSH_TIMEOUT_MS);

String callbackStatusStrategyFactoryClass = verifiableProperties.getString(CONFIG_CALLBACK_STATUS_STRATEGY_FACTORY_CLASS,
CallbackStatusWithComparableOffsetsFactory.class.getName());
Expand Down Expand Up @@ -228,6 +234,9 @@ public long getPostTaskInterruptTimeoutMs() {
public long getShutdownExecutorShutdownTimeoutMs() {
return _taskInterruptTimeoutMs + POST_TASK_INTERRUPT_TIMEOUT_MS;
}
public long getHardCommitFlushTimeoutMs() {
return _hardCommitFlushTimeoutMs;
}

public CallbackStatusFactory<Long> getCallbackStatusStrategyFactory() {
return _callbackStatusStrategyFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -124,12 +128,14 @@ public class KafkaMirrorMakerConnectorTask extends AbstractKafkaBasedConnectorTa
// among Kafka consumer client metrics for different datastreams.
private final boolean _includeDatastreamNameInConsumerClientId;
private final String _destinationTopicPrefix;
private final long _hardCommitFlushTimeoutMs;
private FlushlessEventProducerHandler<Long> _flushlessProducer = null;
private boolean _flowControlEnabled = false;
private long _maxInFlightMessagesThreshold;
private long _minInFlightMessagesThreshold;
private int _flowControlTriggerCount = 0;
private int _errorOnSendCallbackDuringShutdownCount = 0;
private ExecutorService _producerFlushExecutor;

/**
* Constructor for KafkaMirrorMakerConnectorTask
Expand All @@ -151,9 +157,11 @@ public KafkaMirrorMakerConnectorTask(KafkaBasedConnectorConfig config, Datastrea
_isIdentityMirroringEnabled = KafkaMirrorMakerDatastreamMetadata.isIdentityPartitioningEnabled(_datastream);
_enablePartitionAssignment = config.getEnablePartitionAssignment();
_includeDatastreamNameInConsumerClientId = config.getIncludeDatastreamNameInConsumerClientId();
_hardCommitFlushTimeoutMs = config.getHardCommitFlushTimeoutMs();
_destinationTopicPrefix = task.getDatastreams().get(0).getMetadata()
.getOrDefault(DatastreamMetadataConstants.DESTINATION_TOPIC_PREFIX, DEFAULT_DESTINATION_TOPIC_PREFIX);
_dynamicMetricsManager = DynamicMetricsManager.getInstance();
_producerFlushExecutor = Executors.newSingleThreadExecutor();

if (_enablePartitionAssignment) {
LOG.info("Enable Brooklin partition assignment");
Expand Down Expand Up @@ -406,7 +414,10 @@ protected void maybeCommitOffsets(Consumer<?, ?> consumer, boolean hardCommit) {
if (hardCommit) { // hard commit (flush and commit checkpoints)
LOG.info("Calling flush on the producer.");
try {
_datastreamTask.getEventProducer().flush();
Future<?> producerFlushFuture = _producerFlushExecutor.submit(() -> _datastreamTask.getEventProducer().flush());
producerFlushFuture.get(_hardCommitFlushTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
LOG.warn("Producer flush failed with exception: ", ex);
} finally {
// Flushless mode tracks the successfully received acks, so it is safe to commit offsets even if flush throws
// an exception. Commit the safe offsets to reduce send duplication.
Expand Down Expand Up @@ -461,6 +472,7 @@ protected void postShutdownHook() {
}
}
}
_producerFlushExecutor.shutdown();
}

/**
Expand Down