-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17002: Integrated partition leader epoch for Persister APIs (KIP-932) #16842
Changes from 15 commits
b31e4e4
44d537c
8030ae8
4e1598d
a0ee7a6
4b4b226
d8e38c4
57f3b99
2694ee2
1103acb
f08f14e
a6cb2de
c91fb93
edd56a9
b27c805
d8bda03
24a8098
203d685
a1c3dc8
99ba4a7
ceb5720
8e61c98
f6145f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -180,6 +180,11 @@ public static RecordState forId(byte id) { | |
*/ | ||
private final TopicIdPartition topicIdPartition; | ||
|
||
/** | ||
* The leader epoch is used to track the partition epoch. | ||
*/ | ||
private final int leaderEpoch; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making this final implies the SharePartition is now scoped to the lifetime of a partition's leader epoch. Since SPs are managed by the node which is the leader for that partition, I guess this is already the case (and not really a problem). We normally expect the leader to move when the leader epoch increases, but I'm not sure if that's always the case. Hypothetically, if a leader epoch increased but the leader did not move, would it be possible to reuse the SharePartition state? Or would we need to re-load its state from the persister anyways? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it's for share partition life time. We are marking the Share Partition fenced and un-usable if an error occurs. Which means the state should be re-loaded from the persister to function ahead. |
||
|
||
/** | ||
* The in-flight record is used to track the state of a record that has been fetched from the | ||
* leader. The state of the record is used to determine if the record should be re-fetched or if it | ||
|
@@ -279,6 +284,7 @@ public static RecordState forId(byte id) { | |
SharePartition( | ||
String groupId, | ||
TopicIdPartition topicIdPartition, | ||
int leaderEpoch, | ||
int maxInFlightMessages, | ||
int maxDeliveryCount, | ||
int defaultRecordLockDurationMs, | ||
|
@@ -290,6 +296,7 @@ public static RecordState forId(byte id) { | |
) { | ||
this.groupId = groupId; | ||
this.topicIdPartition = topicIdPartition; | ||
this.leaderEpoch = leaderEpoch; | ||
this.maxInFlightMessages = maxInFlightMessages; | ||
this.maxDeliveryCount = maxDeliveryCount; | ||
this.cachedState = new ConcurrentSkipListMap<>(); | ||
|
@@ -340,7 +347,7 @@ public CompletableFuture<Void> maybeInitialize() { | |
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>() | ||
.setGroupId(this.groupId) | ||
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(), | ||
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), 0))))) | ||
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), leaderEpoch))))) | ||
.build()) | ||
.build() | ||
).whenComplete((result, exception) -> { | ||
|
@@ -1666,7 +1673,7 @@ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatc | |
.setGroupId(this.groupId) | ||
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(), | ||
Collections.singletonList(PartitionFactory.newPartitionStateBatchData( | ||
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches)))) | ||
topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches)))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we get an error like UNKNOWN_TOPIC_OR_PARTITION or FENCED_STATE_EPOCH, should we remove the sharePartition too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have handled it. |
||
).build()).build()) | ||
.whenComplete((result, exception) -> { | ||
if (exception != null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
*/ | ||
package kafka.server.share; | ||
|
||
import kafka.cluster.Partition; | ||
import kafka.server.ReplicaManager; | ||
|
||
import org.apache.kafka.clients.consumer.AcknowledgeType; | ||
|
@@ -69,8 +70,10 @@ | |
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.stream.Collectors; | ||
|
||
import scala.jdk.javaapi.CollectionConverters; | ||
import scala.util.Either; | ||
|
||
/** | ||
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. | ||
|
@@ -260,11 +263,13 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part | |
this.shareGroupMetrics.shareAcknowledgement(); | ||
Map<TopicIdPartition, CompletableFuture<Errors>> futures = new HashMap<>(); | ||
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { | ||
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition)); | ||
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); | ||
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); | ||
if (sharePartition != null) { | ||
CompletableFuture<Errors> future = new CompletableFuture<>(); | ||
sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> { | ||
if (throwable != null) { | ||
handleSharePartitionException(sharePartitionKey, throwable); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be consistent, we want to add the same logic for shareFetch too. To do this, we need to extend FetchParams such that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
future.complete(Errors.forException(throwable)); | ||
return; | ||
} | ||
|
@@ -328,14 +333,16 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part | |
|
||
Map<TopicIdPartition, CompletableFuture<Errors>> futuresMap = new HashMap<>(); | ||
topicIdPartitions.forEach(topicIdPartition -> { | ||
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition)); | ||
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); | ||
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); | ||
if (sharePartition == null) { | ||
log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition); | ||
futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION)); | ||
} else { | ||
CompletableFuture<Errors> future = new CompletableFuture<>(); | ||
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, throwable) -> { | ||
if (throwable != null) { | ||
handleSharePartitionException(sharePartitionKey, throwable); | ||
future.complete(Errors.forException(throwable)); | ||
return; | ||
} | ||
|
@@ -479,6 +486,30 @@ public void acknowledgeSessionUpdate(String groupId, ShareRequestMetadata reqMet | |
} | ||
} | ||
|
||
/** | ||
* The handleFetchException method is used to handle the exception that occurred while reading from log. | ||
* The method will handle the exception for each topic-partition in the request. The share partition | ||
* might get removed from the cache. | ||
* <p> | ||
* The replica read request might error out for one share partition | ||
* but as we cannot determine which share partition errored out, we might remove all the share partitions | ||
* in the request. | ||
* | ||
* @param groupId The group id in the share fetch request. | ||
* @param topicIdPartitions The topic-partitions in the replica read request. | ||
* @param future The future to complete with the exception. | ||
* @param throwable The exception that occurred while fetching messages. | ||
*/ | ||
public void handleFetchException( | ||
String groupId, | ||
Set<TopicIdPartition> topicIdPartitions, | ||
CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
Throwable throwable | ||
) { | ||
topicIdPartitions.forEach(topicIdPartition -> handleSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable)); | ||
maybeCompleteShareFetchExceptionally(future, topicIdPartitions, throwable); | ||
} | ||
|
||
/** | ||
* The cachedTopicIdPartitionsInShareSession method is used to get the cached topic-partitions in the share session. | ||
* | ||
|
@@ -543,7 +574,7 @@ void processShareFetch(ShareFetchData shareFetchData) { | |
// TopicPartitionData list will be populated only if the share partition is already initialized. | ||
sharePartition.maybeInitialize().whenComplete((result, throwable) -> { | ||
if (throwable != null) { | ||
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); | ||
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), topicIdPartition, throwable); | ||
} | ||
}); | ||
}); | ||
|
@@ -565,21 +596,24 @@ void processShareFetch(ShareFetchData shareFetchData) { | |
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this), | ||
delayedShareFetchWatchKeys); | ||
} catch (Exception e) { | ||
// In case exception occurs then release the locks so queue can be further processed. | ||
log.error("Error processing fetch queue for share partitions", e); | ||
if (!shareFetchData.future().isDone()) { | ||
shareFetchData.future().completeExceptionally(e); | ||
} | ||
// Complete the whole fetch request with an exception if there is an error processing. | ||
// The exception currently can be thrown only if there is an error while initializing | ||
// the share partition. But skip the processing for other share partitions in the request | ||
// as this situation is not expected. | ||
log.error("Error processing share fetch request", e); | ||
maybeCompleteShareFetchExceptionally(shareFetchData.future(), shareFetchData.partitionMaxBytes().keySet(), e); | ||
} | ||
} | ||
|
||
private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) { | ||
return partitionCacheMap.computeIfAbsent(sharePartitionKey, | ||
k -> { | ||
long start = time.hiResClockMs(); | ||
int leaderEpoch = leaderEpoch(sharePartitionKey.topicIdPartition().topicPartition()); | ||
SharePartition partition = new SharePartition( | ||
sharePartitionKey.groupId(), | ||
sharePartitionKey.topicIdPartition(), | ||
leaderEpoch, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ShareFetchUtils.leaderEpoch can return exceptions like NOT_LEADER_OR_FOLLOWER and UNKNOWN_TOPIC_OR_PARTITION. Should we handle that at the partition level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handled at partition level. But I have added a couple of TODOs in the PR to take them in follow up as this PR is getting bigger. I am of opinion that we can take the changes incrementally. |
||
maxInFlightMessages, | ||
maxDeliveryCount, | ||
defaultRecordLockDurationMs, | ||
|
@@ -597,6 +631,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio | |
private void maybeCompleteInitializationWithException( | ||
SharePartitionKey sharePartitionKey, | ||
CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
TopicIdPartition topicIdPartition, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this since it's part of SharePartitionKey? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, removed. |
||
Throwable throwable) { | ||
if (throwable instanceof LeaderNotAvailableException) { | ||
log.debug("The share partition with key {} is not initialized yet", sharePartitionKey); | ||
|
@@ -606,22 +641,51 @@ private void maybeCompleteInitializationWithException( | |
return; | ||
} | ||
|
||
// Remove the partition from the cache as it's failed to initialize. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the beginning of this method, we check for LeaderNotAvailableException. When do we get that exception? ReadShareGroupStateResponse doesn't seem to have that error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
partitionCacheMap.remove(sharePartitionKey); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both are great suggestions, do you think it would be right to have it in follow up PRs? |
||
// The partition initialization failed, so complete the request with the exception. | ||
// The server should not be in this state, so log the error on broker and surface the same | ||
// to the client. The broker should not be in this state, investigate the root cause of the error. | ||
log.error("Error initializing share partition with key {}", sharePartitionKey, throwable); | ||
maybeCompleteShareFetchExceptionally(future, Collections.singletonList(topicIdPartition), throwable); | ||
} | ||
|
||
private void handleSharePartitionException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest "fenced" should appear in the method name. This is essentially seeing if the exception indicates fencing and then discarding the share partition from the cache. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My idea was to have general exception handling method as we might find other exceptions which needs some handling as well. Do you think we should strictly have fenced special handling method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point was that you're really only handling a couple of exceptions at the moment, and they're both fencing-related. Anyway, just a suggestion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I have changed the method name. Might move to generic one if required in future. |
||
SharePartitionKey sharePartitionKey, | ||
Throwable throwable | ||
) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we include UNKNOWN_TOPIC_OR_PARTITION below too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) { | ||
log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage()); | ||
// The share partition is fenced hence remove the partition from map and let the client retry. | ||
// But surface the error to the client so client might take some action i.e. re-fetch | ||
// the metadata and retry the fetch on new leader. | ||
partitionCacheMap.remove(sharePartitionKey); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we remove share partition from cache at 2 places. 1) When initialization failed 2) When fenced error occurs. For 2, I was in mixed opinion. As all interactions with share partition happens currently while fetching instance from cache hence once removed or re-initialized the new state should appear. But if old share partition instance is already held by some other thread then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think its better to have a new state. Should we have a state as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think these states sound sensible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, I agree on this as well There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a state machine written down somewhere for SharePartition? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm, not yet. Let me create the state transitions better in a single place where we can see the transitions. |
||
future.completeExceptionally(throwable); | ||
return; | ||
} | ||
} | ||
|
||
// The partition initialization failed, so complete the request with the exception. | ||
// The server should not be in this state, so log the error on broker and surface the same | ||
// to the client. As of now this state is in-recoverable for the broker, and we should | ||
// investigate the root cause of the error. | ||
log.error("Error initializing share partition with key {}", sharePartitionKey, throwable); | ||
future.completeExceptionally(throwable); | ||
private void maybeCompleteShareFetchExceptionally(CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) { | ||
if (!future.isDone()) { | ||
Errors error = Errors.forException(throwable); | ||
future.complete(topicIdPartitions.stream().collect(Collectors.toMap( | ||
tp -> tp, tp -> new PartitionData().setErrorCode(error.code()).setErrorMessage(error.message())))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We lose the error message in throwable when converting it to Error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
} | ||
|
||
private int leaderEpoch(TopicPartition tp) { | ||
Either<Errors, Partition> partitionOrError = replicaManager.getPartitionOrError(tp); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've checked the error codes here and the KIP looks right to me. |
||
if (partitionOrError.isLeft()) { | ||
log.debug("Failed to get partition leader for topic partition: {}-{} due to error: {}", | ||
tp.topic(), tp.partition(), partitionOrError.left().get()); | ||
throw partitionOrError.left().get().exception(); | ||
} | ||
|
||
Partition partition = partitionOrError.right().get(); | ||
if (!partition.isLeader()) { | ||
log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition()); | ||
throw new NotLeaderOrFollowerException(); | ||
} | ||
return partition.getLeaderEpoch(); | ||
} | ||
|
||
private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, the error code is returned in responseLogResult. So we need to handle the error there too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are going with massive refactor with min bytes PR in delayed share fetch, also the handling of this error will have it's own quite possible cases hence created a jira to take that up in following PR: https://issues.apache.org/jira/browse/KAFKA-17887
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I think the error from LogReadResult is already handled as we set the respective partition level error while parsing response in ShareFetchUtils.processFetchResponse method. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems this is no longer an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for confirming @junrao, I have closed the ticket.