Skip to content

KAFKA-17871: avoid blocking the herder thread when producer flushing hangs #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 37 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8fa0d97
MINOR: Fix typo in ApiKeyVersionsProvider exception message (#19521)
apalan60 Apr 21, 2025
7710d1c
KAFKA-14487: Move LogManager static methods/fields to storage module …
mimaison Apr 21, 2025
a04c2fe
KAFKA-19180 Fix the hanging testPendingTaskSize (#19526)
Yunyung Apr 21, 2025
ae771d7
KAFKA-8830 make Record Headers available in onAcknowledgement (#17099)
rich-c-shop Apr 21, 2025
18584b1
MINOR: ignore null judgement in LogCleaner (#19524)
gongxuanzhang Apr 21, 2025
a8f4999
KAFKA-19019: Add support for remote storage fetch for share groups (#…
adixitconfluent Apr 21, 2025
4410d35
KAFKA-19179: remove the dot from thread_dump_url (#19525)
gongxuanzhang Apr 22, 2025
070892d
MINOR: redundant gradle expression is removed (#19200)
dejan2609 Apr 22, 2025
11904c7
KAFKA-19166: Fix RC tag in release script (#19518)
mumrah Apr 22, 2025
66147d5
KAFKA-19057: Stabilize KIP-932 RPCs for AK 4.1 (#19378)
AndrewJSchofield Apr 22, 2025
22c5794
KAFKA-19159: Removed time based evictions for share sessions (#19500)
chirag-wadhwa5 Apr 22, 2025
e78e106
MINOR: Improve javadoc for share consumer (#19533)
AndrewJSchofield Apr 22, 2025
5af3547
KAFKA-18572: Update Kafka Streams metric documenation (#18673)
ukpagrace Apr 22, 2025
b97a130
KAFKA-16538; Enable upgrading kraft version for existing clusters (#1…
jsancio Apr 22, 2025
71d0878
KAFKA-14690; Add TopicId to OffsetCommit API (#19461)
dajac Apr 23, 2025
b088ba7
KAFKA-19181: removed assertions in test_share_multiple_partitions as …
chirag-wadhwa5 Apr 23, 2025
4335027
KAFKA-19156: Streamlined share group configs, with usage in ShareSess…
chirag-wadhwa5 Apr 23, 2025
6fe1598
KAFKA-18170: Add scheduled job to snapshot cold share partitions. (#1…
smjn Apr 23, 2025
efd7852
KAFKA-19124: Follow up on code improvements (#19453)
cadonna Apr 23, 2025
8b4560e
KAFKA-15767 Refactor TransactionManager to avoid use of ThreadLocal (…
kirktrue Apr 23, 2025
3fae785
KAFKA-19110: Add missing unit test for Streams-consumer integration (…
FrankYang0529 Apr 24, 2025
f0f5571
MINOR: Change KIP-932 log messages from early access to preview (#19547)
AndrewJSchofield Apr 24, 2025
f4ab3a2
MINOR: Use readable interface to parse response (#19353)
soondenana Apr 24, 2025
039ba5e
MINOR: Improve member epoch mismatch logging for share groups (#19549)
AndrewJSchofield Apr 24, 2025
3c05dfd
KAFKA-18889: Make records in ShareFetchResponse non-nullable (#19536)
apoorvmittal10 Apr 24, 2025
62fe528
KAFKA-19082: [1/4] Add client config for enable2PC and overloaded ini…
rreddy-22 Apr 24, 2025
d6133f6
KAFKA-18988: Connect Multiversion Support (Updates to status and metr…
snehashisp Apr 24, 2025
a948537
MINOR: Small refactor in group coordinator (#19551)
dajac Apr 24, 2025
22b89b6
KAFKA-19192; Old bootstrap checkpoint files cause problems updated se…
cmccabe Apr 24, 2025
36d2498
MINOR: Use meaningful name in AsyncKafkaConsumerTest (#19550)
FrankYang0529 Apr 24, 2025
ee4debb
KAFKA-19128: Kafka Streams should not get offsets when close dirty (#…
aliehsaeedii Apr 25, 2025
e79f5f0
KAFKA-19195: Only send the right group ID subset to each GC shard (#1…
lucasbru Apr 25, 2025
2ce7c44
KAFKA-19198: Resolve NPE when topic assigned in share group is delete…
AndrewJSchofield Apr 25, 2025
732ed06
KAFKA-19190: Handle shutdown application correctly (#19544)
lucasbru Apr 25, 2025
369cc56
KAFKA-17747: [1/N] Add MetadataHash field to Consumer/Share/StreamGro…
FrankYang0529 Apr 25, 2025
0979bb5
KAFKA-17871: avoid blocking the herder thread when producer flushing …
davide-armand Dec 9, 2024
5d8f00c
KAFKA-17871: fixup tests
davide-armand Mar 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/scripts/junit.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def split_report_path(base_path: str, report_path: str) -> Tuple[str, str]:
failure_messages.append(f"Gradle task had a failure exit code. Failing this script.")

if thread_dump_url:
failure_messages.append(f"Thread dump available at {thread_dump_url}. Failing this script.")
failure_messages.append(f"Thread dump available at {thread_dump_url} and the script will now fail.")

if junit_report_url:
report_md = f"Download [JUnit HTML report]({junit_report_url})"
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ subprojects {

// Fix for avoiding inclusion of runtime dependencies marked as 'shadow' in MANIFEST Class-Path.
// https://github.com/GradleUp/shadow/issues/324
afterEvaluate {
pom.withXml { xml ->
def dependenciesNode = xml.asNode().get('dependencies') ?: xml.asNode().appendNode('dependencies')
project.configurations.shadowed.allDependencies.each {
Expand All @@ -380,7 +379,6 @@ subprojects {
dependencyNode.appendNode('scope', 'runtime')
}
}
}
}

afterEvaluate {
Expand Down Expand Up @@ -2230,6 +2228,7 @@ project(':storage') {
}

dependencies {
implementation project(':metadata')
implementation project(':storage:storage-api')
implementation project(':server-common')
implementation project(':clients')
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.config" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.storage.log.metrics"/>
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaProducerTest).java"/>

<suppress checks="NPathComplexity"
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1")
},
types = {Type.KRAFT}
)
Expand Down Expand Up @@ -1859,8 +1858,7 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3")
}
)
@Timeout(90)
Expand Down Expand Up @@ -2011,8 +2009,7 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"),
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3"),
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
@ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "3")
}
)
@Timeout(150)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public OffsetCommitRequest.Builder buildBatchedRequest(
.setGroupId(groupId.idValue)
.setTopics(new ArrayList<>(offsetData.values()));

return new OffsetCommitRequest.Builder(data);
return OffsetCommitRequest.Builder.forTopicNames(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<Coo
return new DeleteShareGroupOffsetsRequest.Builder(
new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId.idValue)
.setTopics(topics),
true
.setTopics(topics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ShareGroupDescribeRequest.Builder buildBatchedRequest(int coordinatorId,
ShareGroupDescribeRequestData data = new ShareGroupDescribeRequestData()
.setGroupIds(groupIds)
.setIncludeAuthorizedOperations(includeAuthorizedOperations);
return new ShareGroupDescribeRequest.Builder(data, true);
return new ShareGroupDescribeRequest.Builder(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina
});
DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData()
.setGroups(groups);
return new DescribeShareGroupOffsetsRequest.Builder(data, true);
return new DescribeShareGroupOffsetsRequest.Builder(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@
/**
* A client that consumes records from a Kafka cluster using a share group.
* <p>
* <em>This is an early access feature under development which is introduced by KIP-932.
* It is not suitable for production use until it is fully implemented and released.</em>
* <em>This is a preview feature introduced by KIP-932. It is not yet recommended for production use.</em>
*
* <h3>Cross-Version Compatibility</h3>
* This client can communicate with brokers that are a version that supports share groups. You will receive an
Expand Down Expand Up @@ -100,7 +99,7 @@
* of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a
* time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default,
* the lock duration is 30 seconds, but it can also be controlled using the group {@code group.share.record.lock.duration.ms}
* configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and
* configuration property. The idea is that the lock is automatically released once the lock duration has elapsed, and
* then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in
* the following ways:
* <ul>
Expand All @@ -116,8 +115,8 @@
* {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
* <p>
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the
* consumer {@code share.acknowledgement.mode} property.
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes by using the
* consumer {@code share.acknowledgement.mode} configuration property.
* <p>
* If the application sets the property to "implicit" or does not set it at all, then the consumer is using
* <em>implicit acknowledgement</em>. In this mode, the application acknowledges delivery by:
Expand All @@ -129,7 +128,7 @@
* the delivered records as processed successfully and commits the acknowledgements to Kafka.</li>
* <li>Calling {@link #close()} which releases any acquired records without acknowledgement.</li>
* </ul>
* If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgment</em>.
* If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgement</em>.
* The application must acknowledge all records returned from {@link #poll(Duration)} using
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call to {@link #poll(Duration)}.
* If the application calls {@link #poll(Duration)} without having acknowledged all records, an
Expand Down Expand Up @@ -162,6 +161,7 @@
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
*
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
Expand All @@ -181,6 +181,7 @@
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
*
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
Expand All @@ -203,6 +204,7 @@
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;share.acknowledgement.mode&quot;, &quot;explicit&quot;);
*
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
Expand Down Expand Up @@ -443,7 +445,7 @@ public void unsubscribe() {
}

/**
* Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
* Deliver records for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
* subscribed to any topics before polling for data.
*
* <p>
Expand All @@ -452,13 +454,14 @@ public void unsubscribe() {
*
* @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
*
* @return map of topic to records since the last fetch for the subscribed list of topics
* @return map of topic to records
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException if caller lacks Read access to any of the subscribed
* topics or to the share group. See the exception for more details
* @throws IllegalArgumentException if the timeout value is negative
* @throws IllegalStateException if the consumer is not subscribed to any topics
* @throws IllegalStateException if the consumer is not subscribed to any topics, or it is using
* explicit acknowledgement and has not acknowledged all records previously delivered
* @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
* @throws InvalidTopicException if the current subscription contains any invalid
* topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
Expand All @@ -475,11 +478,12 @@ public ConsumerRecords<K, V> poll(Duration timeout) {
* Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
* The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
* {@link #poll(Duration)} call.
* <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
*
* @param record The record to acknowledge
*
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
* used implicit acknowledgement
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
* explicit acknowledgement
*/
@Override
public void acknowledge(ConsumerRecord<K, V> record) {
Expand All @@ -489,14 +493,14 @@ public void acknowledge(ConsumerRecord<K, V> record) {
/**
* Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
* it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
* {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
* <b>explicit acknowledgement</b>.
* {@link #commitAsync()} or {@link #poll(Duration)} call.
* <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
*
* @param record The record to acknowledge
* @param type The acknowledgement type which indicates whether it was processed successfully
*
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
* used implicit acknowledgement
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
* explicit acknowledgement
*/
@Override
public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
Expand Down Expand Up @@ -585,7 +589,7 @@ public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callb
* client to complete the request.
* <p>
* Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
* configuration option.
* configuration property.
*
* @param timeout The maximum time to wait for consumer client to determine its client instance ID.
* The value must be non-negative. Specifying a timeout of zero means do not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
*/
private class BackgroundEventProcessor implements EventProcessor<BackgroundEvent> {

private Optional<StreamsRebalanceListener> streamsGroupRebalanceCallbacks = Optional.empty();
private Optional<StreamsRebalanceListener> streamsRebalanceListener = Optional.empty();
private final Optional<StreamsRebalanceData> streamsRebalanceData;

public BackgroundEventProcessor() {
Expand All @@ -202,7 +202,7 @@ private void setStreamsRebalanceListener(final StreamsRebalanceListener streamsR
throw new IllegalStateException("Background event processor was not created to be used with Streams " +
"rebalance protocol events");
}
this.streamsGroupRebalanceCallbacks = Optional.of(streamsRebalanceListener);
this.streamsRebalanceListener = Optional.of(streamsRebalanceListener);
}

@Override
Expand Down Expand Up @@ -277,20 +277,15 @@ private void processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT

private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke,
final CompletableFuture<Void> future) {
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = streamsGroupRebalanceCallbacks().onTasksRevoked(activeTasksToRevoke);
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task revocation callback throws an error"));
} else {
error = Optional.empty();
}
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
final Optional<KafkaException> error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws an error"));
return new StreamsOnTasksRevokedCallbackCompletedEvent(future, error);
}

private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment,
final CompletableFuture<Void> future) {
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = streamsGroupRebalanceCallbacks().onTasksAssigned(assignment);
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onTasksAssigned(assignment);
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task assignment callback throws an error"));
} else {
Expand All @@ -302,7 +297,7 @@ private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallba

private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) {
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = streamsGroupRebalanceCallbacks().onAllTasksLost();
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onAllTasksLost();
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "All tasks lost callback throws an error"));
} else {
Expand All @@ -318,8 +313,8 @@ private StreamsRebalanceData streamsRebalanceData() {
"rebalance protocol events"));
}

private StreamsRebalanceListener streamsGroupRebalanceCallbacks() {
return streamsGroupRebalanceCallbacks.orElseThrow(
private StreamsRebalanceListener streamsRebalanceListener() {
return streamsRebalanceListener.orElseThrow(
() -> new IllegalStateException("Background event processor was not created to be used with Streams " +
"rebalance protocol events"));
}
Expand Down
Loading
Loading