Skip to content

Commit

Permalink
Add option to specify timeout for how long to wait for blocking Produ…
Browse files Browse the repository at this point in the history
…cer#send
  • Loading branch information
JorgenRingen committed Jun 23, 2021
1 parent 9bf2798 commit 490082b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,19 @@ public enum CommitMode {
@Builder.Default
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);

/**
* Controls how long to block while waiting for the {@link Producer#send} to complete for any ProducerRecords
* returned from the user-function. Only relevant if using one of the produce-flows and providing a
* {@link ParallelConsumerOptions#producer}. If the timeout occurs the record will be re-processed in the user-function.
*
* Consider aligning the value with the {@link ParallelConsumerOptions#producer}-options to avoid unnecessary re-processing
* and duplicates on slow {@link Producer#send} calls.
*
* @see org.apache.kafka.clients.producer.ProducerConfig#DELIVERY_TIMEOUT_MS_CONFIG
*/
@Builder.Default
private final Duration sendTimeout = Duration.ofSeconds(2);

public void validate() {
Objects.requireNonNull(consumer, "A consumer must be supplied");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -42,7 +43,6 @@ public class ProducerManager<K, V> extends AbstractOffsetCommitter<K, V> impleme
private Field txManagerField;
private Method txManagerMethodIsCompleting;
private Method txManagerMethodIsReady;
private final long sendTimeoutSeconds = 2L;

public ProducerManager(final Producer<K, V> newProducer, final ConsumerManager<K, V> newConsumer, final WorkManager<K, V> wm, ParallelConsumerOptions options) {
super(newConsumer, wm);
Expand Down Expand Up @@ -138,7 +138,7 @@ RecordMetadata produceMessage(ProducerRecord<K, V> outMsg) {
try {
log.trace("Blocking on produce result");
RecordMetadata recordMetadata = TimeUtils.time(() ->
send.get(sendTimeoutSeconds, TimeUnit.SECONDS));
send.get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS));
log.trace("Produce result received");
return recordMetadata;
} catch (Exception e) {
Expand Down

0 comments on commit 490082b

Please sign in to comment.