-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postpone removing records until assignment is made. #1053
base: series/3.x
Are you sure you want to change the base?
Postpone removing records until assignment is made. #1053
Conversation
d61250d
to
147acd7
Compare
@@ -483,6 +489,7 @@ private[kafka] object KafkaConsumerActor { | |||
type StreamId = Int | |||
|
|||
final case class State[F[_], K, V]( | |||
assignments: Set[TopicPartition], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it this way because I experienced deadlock when I tried to do so with withConsumer(_.assignments().toSet)
in onAssignedPartitions
invocations.
147acd7
to
bf68fd7
Compare
for { | ||
action <- ref.modify { state => | ||
removeRevokedRecords.run( | ||
state | ||
.withRebalancing(false) | ||
.withAssignments(assigned) | ||
) | ||
} | ||
updatedState <- ref.get | ||
_ <- action >> | ||
log(AssignedPartitions(assigned, updatedState)) >> | ||
updatedState.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned)) | ||
} yield () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a separate ref.get
can we return the new state from ref.modify
:
for { | |
action <- ref.modify { state => | |
removeRevokedRecords.run( | |
state | |
.withRebalancing(false) | |
.withAssignments(assigned) | |
) | |
} | |
updatedState <- ref.get | |
_ <- action >> | |
log(AssignedPartitions(assigned, updatedState)) >> | |
updatedState.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned)) | |
} yield () | |
ref | |
.modify { state => | |
removeRevokedRecords | |
.run( | |
state | |
.withRebalancing(false) | |
.withAssignments(assigned) | |
) | |
.map(stateAndAction => stateAndAction._1 -> stateAndAction) | |
} | |
.flatMap { | |
case (updatedState, action) => | |
action >> log(AssignedPartitions(assigned, updatedState)) >> | |
updatedState.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned)) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I didn't notice it could be done this way.
Great thanks! 👍
private[this] def assigned(assigned: SortedSet[TopicPartition]): F[Unit] = { | ||
def withState[A] = StateT.apply[Id, State[F, K, V], A](_) | ||
|
||
val removeRevokedRecords = withState { st => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the documentation, the assigned
parameter only includes newly added partitions, so there is no need to revoke anything. It appears to be an unnecessary sanity check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly RebalanceListener calls a method to revoke partitions. Previously in this step revoked partitions had records removed. The problem here was lying in combination of Greedy rebalance protocols + paused partitions. Strangely, when paused partitions are cleared, fetching new records does not start from latest committed offset. It started from saved locally position. It caused the following problem: paused partitions records were revoked here, and when fetching new records started, this started from latest removed record offset + 1. This issue fortunately could be mitigated with Cooperative protocol, which on RebalanceListener methods used only revoked and only newly assigned partitions as opposed to Greedy protocol - all partitions were revoked and all partitions were assigned, discarding local state pre-rebalance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing I have to recall is that why I only rely on assigned
parameter. It was long time ago and I would need to deep dive in it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very valuable piece of information. Could you please add it as a comment somewhere? And is it possible to prove this behavior with the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I took additional look.
Łukasz is right about assigned partitions - assigned callback is invoked only with new assigned partitons (Coming from https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L417):
SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
ownedPartitions.addAll(subscriptions.assignedPartitions());
// should at least encode the short version
if (assignmentBuffer.remaining() < 2)
throw new IllegalStateException("There are insufficient bytes available to read assignment from the sync-group response (" +
"actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; " +
"it is possible that the leader's assign function is buggy and did not return any assignment for this member, " +
"or because static member is configured and the protocol is buggy hence did not get the assignment for this member");
//Some code...
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
SortedSet<TopicPartition> assignedPartitions = new TreeSet<>(COMPARATOR);
assignedPartitions.addAll(assignment.partitions());
//Some code...
final AtomicReference<Exception> firstException = new AtomicReference<>(null);
SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
addedPartitions.addAll(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
if (protocol == RebalanceProtocol.COOPERATIVE) {
SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
revokedPartitions.addAll(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
log.info("Updating assignment with\n" +
"\tAssigned partitions: {}\n" +
"\tCurrent owned partitions: {}\n" +
"\tAdded partitions (assigned - owned): {}\n" +
"\tRevoked partitions (owned - assigned): {}\n",
assignedPartitions,
ownedPartitions,
addedPartitions,
revokedPartitions
);
//..more code
but more magic happens when revoke callback is invoked (https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L777):
switch (protocol) {
case EAGER:
// revoke all partitions
revokedPartitions.addAll(subscriptions.assignedPartitions());
exception = invokePartitionsRevoked(revokedPartitions);
subscriptions.assignFromSubscribed(Collections.emptySet());
break;
case COOPERATIVE:
// only revoke those partitions that are not in the subscription any more.
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions.addAll(ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet()));
if (!revokedPartitions.isEmpty()) {
exception = invokePartitionsRevoked(revokedPartitions);
ownedPartitions.removeAll(revokedPartitions);
subscriptions.assignFromSubscribed(ownedPartitions);
}
break;
}
And is it possible to prove this behavior with the test?
It is really really hard to reproduce it. It mainly happens when small amount of resources are assigned comparing to workload that must be consumed.
The behaviour is as follows for paused partitions:
//pod-2 has been started. Consumer is being prepared...
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000028, offsetEpoch=Optional.empty, ...}
//pod-1 has been shutdown. Rebalance has started again and now all partitions are assigned to pod-2
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000032, offsetEpoch=Optional.empty, ...}
Published record: topic-59:1000037 //Offset is not moved back to 1000032, but it should, since Range (EAGER)
// Commit happens, setting offset to 1000038.
Published record: topic-59:1000040 //Missed records: 1000038, 1000039, which exist in the topic but were skipped by consumer.
Published record: topic-59:1000041
Published record: topic-59:1000042```
|
||
val withRecords = records intersect revokedFetches | ||
val withoutRecords = revokedFetches diff records | ||
|
||
(for { | ||
completeWithRecords <- completeWithRecords(withRecords) | ||
completeWithoutRecords <- completeWithoutRecords(withoutRecords) | ||
removeRevokedRecords <- removeRevokedRecords(revokedNonFetches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you clean up the record
container from records that belong to revoked partitions and have no downstream sinks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Described in above comment.
The whole fix is based on postponing removing records - we wait until assignment to make sure that we can revoke records. This is a small step handling Greedy rebalance protocols.
@@ -276,7 +286,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( | |||
def pollConsumer(state: State[F, K, V]): F[ConsumerRecords] = | |||
withConsumer | |||
.blocking { consumer => | |||
val assigned = consumer.assignment.toSet | |||
val assigned = state.assignments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain how your changes address the issue? The current codebase seems it doesn't have any flaw with managing assigned partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Described above, in above comment 👍
Closes #1051