-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17002: Integrated partition leader epoch for Persister APIs (KIP-932) #16842
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure about this one. Integrating the leader epoch into the persister write API is good, but I am not sure whether the handling of the fenced case makes sense. Essentially, if the Write RPC returns a fenced error, it means that a later leader epoch is being used for this share-partition state, and that means another share-partition leader for this partition is using a later epoch, which presumably means this SPL is philosophically dead.
Hmm, the only scenario I was thinking about that if partition epoch is bumped for any other reason and same broker remains the partition leader then a retry should be needed to fetch latest partition epoch. If that's not true then I agree that retry doesn't make sense. I was also thinking if we should have an identiifer/boolean in SPL that can be toggled when SPL is not a leader. This identifier once set to false will fail all requests with not leader exception, if received any call. So client can switch to new leader. |
@mumrah Can you please review as well and provide input on Partition epoch. @AndrewJSchofield might be right and we might just want to fail the request when such leader error occurs. Just confirming prior making the change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the PR. Left a comment.
+ " leader epoch, current leader epoch: {}", groupId, topicIdPartition, leaderEpoch); | ||
leaderEpoch = getLeaderEpoch(topicIdPartition.topicPartition()); | ||
// Retry the write state operation. | ||
return isWriteShareGroupStateSuccessful(stateBatches, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look right. ShareFetch can only be served at the leader. If this broker is not the leader, leaderEpoch is -1. In that case, we need to send a NOT_LEADER_OR_FOLLOWER error to the client instead of retrying.
The easiest thing on receiving FencedLeaderEpochException is probably to always send an error back to the client and mark the SharePartition as invalid. This way, if the broker becomes the leader again, a new SharePartition will be created and trigger the initialization of group state from the share coordinator.
We probably also want to check if the broker is still the leader on each shareFetch/shareAck request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have addressed the concern.
/** | ||
* The leader epoch is used to track the partition epoch. | ||
*/ | ||
private int leaderEpoch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be volatile since it's written and read by different threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's now read only in share partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
@@ -66,6 +66,7 @@ | |||
import java.util.concurrent.locks.ReadWriteLock; | |||
import java.util.concurrent.locks.ReentrantReadWriteLock; | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra new line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -594,7 +602,8 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio | |||
private void maybeCompleteInitializationWithException( | |||
SharePartitionKey sharePartitionKey, | |||
CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | |||
Throwable throwable) { | |||
Throwable throwable | |||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to make this change? The current format seems to match other existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
if (partitionOrError.isLeft()) { | ||
log.error("Failed to get partition leader for topic partition: {}-{} due to error: {}", | ||
tp.topic(), tp.partition(), partitionOrError.left().get()); | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to return a NOT_LEADER_OR_FOLLOWER error to the client if the broker is not the leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
partitionCacheMap.remove(sharePartitionKey); | ||
future.completeExceptionally(throwable); | ||
return; | ||
partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) -> null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular reason to use computeIfPresent
instead of remove
? The latter is more intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to solve which is not required. I have moved to remove
.
log.error("Error initializing share partition with key {}", sharePartitionKey, throwable); | ||
future.completeExceptionally(throwable); | ||
// TODO: Should the return be -1 or throw an exception? | ||
private int getLeaderEpoch(TopicPartition tp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use getters. So this can just be leaderEpoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if (sharePartition != null) { | ||
CompletableFuture<Errors> future = new CompletableFuture<>(); | ||
sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> { | ||
if (throwable != null) { | ||
handleSharePartitionException(sharePartitionKey, throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, we want to add the same logic for shareFetch too.
To do this, we need to extend FetchParams such that fetchOnlyLeader()
is true for share fetch and handle NotLeaderOrFollowerException
accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@junrao Sorry, I was in middle of my changes when last reviewed. But thanks for the feedback. I have completed the changes now, can you please re-review. Sorry for the trouble. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
maybeCompleteShareFetchExceptionally(future, Collections.singletonList(topicIdPartition), throwable); | ||
} | ||
|
||
private void handleSharePartitionException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest "fenced" should appear in the method name. This is essentially seeing if the exception indicates fencing and then discarding the share partition from the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea was to have general exception handling method as we might find other exceptions which needs some handling as well. Do you think we should strictly have fenced special handling method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point was that you're really only handling a couple of exceptions at the moment, and they're both fencing-related. Anyway, just a suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have changed the method name. Might move to generic one if required in future.
private void handleSharePartitionException( | ||
SharePartitionKey sharePartitionKey, | ||
Throwable throwable | ||
) { | ||
if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) { | ||
log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage()); | ||
// The share partition is fenced hence remove the partition from map and let the client retry. | ||
// But surface the error to the client so client might take some action i.e. re-fetch | ||
// the metadata and retry the fetch on new leader. | ||
partitionCacheMap.remove(sharePartitionKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the SharePartition
need to be put into a fenced state? Removing it from the cache is good for future requests, but is that enough for the object which is already in existence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we remove share partition from cache at 2 places. 1) When initialization failed 2) When fenced error occurs.
For 1, it's safe as its still in initilization state.
For 2, I was in mixed opinion. As all interactions with share partition happens currently while fetching instance from cache hence once removed or re-initialized the new state should appear. But if old share partition instance is already held by some other thread then acknowledge
will anyways fail but fetch
can succeed. Do you think it would be sensible to have another state in SharePartition
as Fenced
, which once set then fetch lock
on that share partition cannot be attained. Do you think we should have an active
status check on all Share Partition APIs as well?
cc: @adixitconfluent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its better to have a new state. Should we have a state as TERMINATED
/FENCED
because a share partition should also be removed from SPM in case the topic partition is deleted or becomes a follower (https://issues.apache.org/jira/browse/KAFKA-17783). Wdyt @apoorvmittal10 @AndrewJSchofield
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think these states sound sensible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should have an active status check on all Share Partition
Yes, I agree on this as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a state machine written down somewhere for SharePartition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, not yet. Let me create the state transitions better in a single place where we can see the transitions.
} | ||
|
||
private int leaderEpoch(TopicPartition tp) { | ||
Either<Errors, Partition> partitionOrError = replicaManager.getPartitionOrError(tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've checked the error codes here and the KIP looks right to me.
int maxBytes, | ||
FetchIsolation isolation, | ||
Optional<ClientMetadata> clientMetadata, | ||
boolean shareFetchRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether this is a bit literal. Why are we supplying a ClientMetadata
here in the share fetch case? That seems to me to be concerned with fetch-from-follower. If we didn't supply a ClientMetadata
, then fetchOnlyLeader()
would already return true without needing the new flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So ClientMetadata includes details for rackId, listenerName which I see in regular fetch if utilized for figuring read replica. Not sure if that would be relevant for ShareFetch in future and if we should skip always for ShareFetch. Hence I have added the additional shareFetchBoolean. Please let me know what you think. Also adding @junrao to provide his inputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
@@ -1743,7 +1750,7 @@ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatc | |||
.setGroupId(this.groupId) | |||
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(), | |||
Collections.singletonList(PartitionFactory.newPartitionStateBatchData( | |||
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches)))) | |||
topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we get an error like UNKNOWN_TOPIC_OR_PARTITION or FENCED_STATE_EPOCH, should we remove the sharePartition too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have handled it.
@@ -113,6 +127,7 @@ public String toString() { | |||
", maxBytes=" + maxBytes + | |||
", isolation=" + isolation + | |||
", clientMetadata=" + clientMetadata + | |||
", shareFetchRequest=" + shareFetchRequest + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we include the new param in hashCode
and equals
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed it, thanks added.
@@ -617,22 +650,35 @@ private void maybeCompleteInitializationWithException( | |||
return; | |||
} | |||
|
|||
// Remove the partition from the cache as it's failed to initialize. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the beginning of this method, we check for LeaderNotAvailableException. When do we get that exception? ReadShareGroupStateResponse doesn't seem to have that error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LeaderNotAvailableException
is an internal exception from SharePartition to SharePartitionManager which can occur only when SharePartition is in process of initialization and not yet complete. Hence fo that period just keep the requests in purgatory.
private void handleFencedSharePartitionException( | ||
SharePartitionKey sharePartitionKey, | ||
Throwable throwable | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we include UNKNOWN_TOPIC_OR_PARTITION below too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
if (!future.isDone()) { | ||
Errors error = Errors.forException(throwable); | ||
future.complete(topicIdPartitions.stream().collect(Collectors.toMap( | ||
tp -> tp, tp -> new PartitionData().setErrorCode(error.code()).setErrorMessage(error.message())))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We lose the error message in throwable when converting it to Error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -608,6 +640,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio | |||
private void maybeCompleteInitializationWithException( | |||
SharePartitionKey sharePartitionKey, | |||
CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | |||
TopicIdPartition topicIdPartition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this since it's part of SharePartitionKey?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, removed.
SharePartition partition = new SharePartition( | ||
sharePartitionKey.groupId(), | ||
sharePartitionKey.topicIdPartition(), | ||
leaderEpoch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShareFetchUtils.leaderEpoch can return exceptions like NOT_LEADER_OR_FOLLOWER and UNKNOWN_TOPIC_OR_PARTITION. Should we handle that at the partition level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled at partition level. But I have added a couple of TODOs in the PR to take them in follow up as this PR is getting bigger. I am of opinion that we can take the changes incrementally.
@@ -127,7 +127,7 @@ public void onComplete() { | |||
shareFetchData.future().complete(result); | |||
} catch (Exception e) { | |||
log.error("Error processing delayed share fetch request", e); | |||
shareFetchData.future().completeExceptionally(e); | |||
sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, the error code is returned in responseLogResult. So we need to handle the error there too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are going with massive refactor with min bytes PR in delayed share fetch, also the handling of this error will have it's own quite possible cases hence created a jira to take that up in following PR: https://issues.apache.org/jira/browse/KAFKA-17887
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I think the error from LogReadResult is already handled as we set the respective partition level error while parsing response in ShareFetchUtils.processFetchResponse method. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems this is no longer an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for confirming @junrao, I have closed the ticket.
@@ -617,22 +650,35 @@ private void maybeCompleteInitializationWithException( | |||
return; | |||
} | |||
|
|||
// Remove the partition from the cache as it's failed to initialize. | |||
partitionCacheMap.remove(sharePartitionKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We probably want to verify the leader epoch in SharePartition before removal to avoid a newly created SharePartition being removed by an old request.
- We piggyback the removal in an error response. The downside is that there are quite a few error places to handle and it is reactive. An alternative is to have a partitionLeaderEpochChange listener. We can then proactively remove a SharePartition when the leader epoch changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both are great suggestions, do you think it would be right to have it in follow up PRs?
@junrao I have made a lot changes and the PR is getting bigger and bigger. I am reverting some of my local changes and will write a plan with jiras so we can proceed. |
/** | ||
* The leader epoch is used to track the partition epoch. | ||
*/ | ||
private final int leaderEpoch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making this final implies the SharePartition is now scoped to the lifetime of a partition's leader epoch. Since SPs are managed by the node which is the leader for that partition, I guess this is already the case (and not really a problem). We normally expect the leader to move when the leader epoch increases, but I'm not sure if that's always the case.
Hypothetically, if a leader epoch increased but the leader did not move, would it be possible to reuse the SharePartition state? Or would we need to re-load its state from the persister anyways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's for share partition life time. We are marking the Share Partition fenced and un-usable if an error occurs. Which means the state should be re-loaded from the persister to function ahead.
private void handleSharePartitionException( | ||
SharePartitionKey sharePartitionKey, | ||
Throwable throwable | ||
) { | ||
if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) { | ||
log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage()); | ||
// The share partition is fenced hence remove the partition from map and let the client retry. | ||
// But surface the error to the client so client might take some action i.e. re-fetch | ||
// the metadata and retry the fetch on new leader. | ||
partitionCacheMap.remove(sharePartitionKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a state machine written down somewhere for SharePartition?
@@ -128,4 +131,13 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic | |||
Optional.empty(), true).timestampAndOffsetOpt(); | |||
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; | |||
} | |||
|
|||
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning an OptionalInt would be a bit nicer than throwing there (maybe). If we actually want a helper method that throws, we should incorporate that into the name (e.g., leaderEpochOrThrow
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With an optionalInt again we need to check and pass the right error. So either we have this method or create leaderEpochOrThrow
in replica manager.
So here is the summary. Sorry, the PR is growing. As now it's more of error handling and less of just leader epoch propagation. The current PR has following:
Some follow ups to do:
I am planning to have couple of follow up PRs, if @junrao @AndrewJSchofield @mumrah you think is fine. As with tests and changes, this PR is getting bigger and bigger. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
Throwable throwable | ||
) { | ||
topicIdPartitions.forEach(topicIdPartition -> handleFencedSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is weird. We actually don't know which partition causes throwable. Ideally, we should just set a top level error instead of applying it on each partition. We probably shouldn't remove the SharePartition here since we are not sure which partition to remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, though you are right but the problem is that ShareFetchResponse contains both Fetch and Acknowledgement response. If we set a top level error code then the acknowledgement in request will also be responded as failed however acknowledgement might have succeeded.
The reason I was removeing the SharePartition in this condition was that recreating a new SharePartition is cheap. However if you think we shouldn't remove then I will avoid that and see waht scenarios we encounter during the tests (moving partitions in a cluster).
// as this situation is not expected. | ||
log.error("Error processing share fetch request", e); | ||
if (erroneous == null) { | ||
erroneous = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems it's more intuitive to initialize erroneous as an empty map so that we don't need to deal with it being null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception occuring from getOrCreateSharePartition
will be rare hence I was of an opinion of not initializing the erroneous and do that only when needed.
private void completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
Map<TopicIdPartition, Throwable> erroneous) { | ||
future.complete(erroneous.entrySet().stream().collect(Collectors.toMap( | ||
Map.Entry::getKey, entry -> new PartitionData().setErrorCode(Errors.forException(entry.getValue()).code()).setErrorMessage(entry.getValue().getMessage())))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line is getting too long. Could we avoid calling entry.getValue()
twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, done.
} | ||
|
||
private boolean stateNotActive() { | ||
return partitionState() != SharePartitionState.ACTIVE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should throw a fenced exception and let the caller handle it. This can be done in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); | ||
|
||
SharePartition sp1 = mock(SharePartition.class); | ||
// Do not make the share partition acquirable hence it shouldn't be removed from the cache, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, should we explicitly mock sp1.maybeAcquireFetchLock
to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done.
|
||
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = | ||
sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); | ||
validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why does the completed future have only 1 partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because fetch didn't happen for other partition as it was not acquirable. Though final response from KafkaApis consolidates the topic-partitions from erroneous
(from KafkaAPIs), share partition manager's topic partitions
and remaining with empty data
.
Hi @junrao , thanks for the feedback. Addressed and replied. https://issues.apache.org/jira/browse/KAFKA-17463?jql=text%20~%20%22testShareGroups%22 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. LGTM
…P-932) (apache#16842) The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure. Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>, Jun Rao <[email protected]>, David Arthur <[email protected]>
The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure.
Committer Checklist (excluded from commit message)