Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17002: Integrated partition leader epoch for Persister APIs (KIP-932) #16842

Merged
merged 23 commits into from
Oct 30, 2024

Conversation

apoorvmittal10
Copy link
Collaborator

The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@apoorvmittal10 apoorvmittal10 self-assigned this Aug 8, 2024
@apoorvmittal10 apoorvmittal10 added the KIP-932 Queues for Kafka label Aug 8, 2024
Copy link
Collaborator

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure about this one. Integrating the leader epoch into the persister write API is good, but I am not sure whether the handling of the fenced case makes sense. Essentially, if the Write RPC returns a fenced error, it means that a later leader epoch is being used for this share-partition state, and that means another share-partition leader for this partition is using a later epoch, which presumably means this SPL is philosophically dead.

@apoorvmittal10
Copy link
Collaborator Author

I'm not quite sure about this one. Integrating the leader epoch into the persister write API is good, but I am not sure whether the handling of the fenced case makes sense. Essentially, if the Write RPC returns a fenced error, it means that a later leader epoch is being used for this share-partition state, and that means another share-partition leader for this partition is using a later epoch, which presumably means this SPL is philosophically dead.

Hmm, the only scenario I was thinking about that if partition epoch is bumped for any other reason and same broker remains the partition leader then a retry should be needed to fetch latest partition epoch. If that's not true then I agree that retry doesn't make sense.

I was also thinking if we should have an identiifer/boolean in SPL that can be toggled when SPL is not a leader. This identifier once set to false will fail all requests with not leader exception, if received any call. So client can switch to new leader.

@apoorvmittal10
Copy link
Collaborator Author

@mumrah Can you please review as well and provide input on Partition epoch. @AndrewJSchofield might be right and we might just want to fail the request when such leader error occurs. Just confirming prior making the change.

@apoorvmittal10
Copy link
Collaborator Author

@junrao @mumrah Can you please advise me about the partition epoch handling please. It would be great if you can take a look please.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Left a comment.

+ " leader epoch, current leader epoch: {}", groupId, topicIdPartition, leaderEpoch);
leaderEpoch = getLeaderEpoch(topicIdPartition.topicPartition());
// Retry the write state operation.
return isWriteShareGroupStateSuccessful(stateBatches, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right. ShareFetch can only be served at the leader. If this broker is not the leader, leaderEpoch is -1. In that case, we need to send a NOT_LEADER_OR_FOLLOWER error to the client instead of retrying.

The easiest thing on receiving FencedLeaderEpochException is probably to always send an error back to the client and mark the SharePartition as invalid. This way, if the broker becomes the leader again, a new SharePartition will be created and trigger the initialization of group state from the share coordinator.

We probably also want to check if the broker is still the leader on each shareFetch/shareAck request.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addressed the concern.

/**
* The leader epoch is used to track the partition epoch.
*/
private int leaderEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be volatile since it's written and read by different threads?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now read only in share partition.

@github-actions github-actions bot added the core Kafka Broker label Oct 16, 2024
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A few more comments.

@@ -66,6 +66,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra new line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -594,7 +602,8 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
private void maybeCompleteInitializationWithException(
SharePartitionKey sharePartitionKey,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Throwable throwable) {
Throwable throwable
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make this change? The current format seems to match other existing code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

if (partitionOrError.isLeft()) {
log.error("Failed to get partition leader for topic partition: {}-{} due to error: {}",
tp.topic(), tp.partition(), partitionOrError.left().get());
return -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to return a NOT_LEADER_OR_FOLLOWER error to the client if the broker is not the leader.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

partitionCacheMap.remove(sharePartitionKey);
future.completeExceptionally(throwable);
return;
partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) -> null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason to use computeIfPresent instead of remove? The latter is more intuitive.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to solve which is not required. I have moved to remove.

log.error("Error initializing share partition with key {}", sharePartitionKey, throwable);
future.completeExceptionally(throwable);
// TODO: Should the return be -1 or throw an exception?
private int getLeaderEpoch(TopicPartition tp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use getters. So this can just be leaderEpoch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (sharePartition != null) {
CompletableFuture<Errors> future = new CompletableFuture<>();
sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> {
if (throwable != null) {
handleSharePartitionException(sharePartitionKey, throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, we want to add the same logic for shareFetch too.

To do this, we need to extend FetchParams such that fetchOnlyLeader() is true for share fetch and handle NotLeaderOrFollowerException accordingly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@apoorvmittal10
Copy link
Collaborator Author

@junrao Sorry, I was in middle of my changes when last reviewed. But thanks for the feedback. I have completed the changes now, can you please re-review. Sorry for the trouble.

Copy link
Collaborator

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

maybeCompleteShareFetchExceptionally(future, Collections.singletonList(topicIdPartition), throwable);
}

private void handleSharePartitionException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest "fenced" should appear in the method name. This is essentially seeing if the exception indicates fencing and then discarding the share partition from the cache.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to have general exception handling method as we might find other exceptions which needs some handling as well. Do you think we should strictly have fenced special handling method?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that you're really only handling a couple of exceptions at the moment, and they're both fencing-related. Anyway, just a suggestion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have changed the method name. Might move to generic one if required in future.

private void handleSharePartitionException(
SharePartitionKey sharePartitionKey,
Throwable throwable
) {
if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from map and let the client retry.
// But surface the error to the client so client might take some action i.e. re-fetch
// the metadata and retry the fetch on new leader.
partitionCacheMap.remove(sharePartitionKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the SharePartition need to be put into a fenced state? Removing it from the cache is good for future requests, but is that enough for the object which is already in existence?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we remove share partition from cache at 2 places. 1) When initialization failed 2) When fenced error occurs.
For 1, it's safe as its still in initilization state.

For 2, I was in mixed opinion. As all interactions with share partition happens currently while fetching instance from cache hence once removed or re-initialized the new state should appear. But if old share partition instance is already held by some other thread then acknowledge will anyways fail but fetch can succeed. Do you think it would be sensible to have another state in SharePartition as Fenced, which once set then fetch lock on that share partition cannot be attained. Do you think we should have an active status check on all Share Partition APIs as well?
cc: @adixitconfluent

Copy link
Contributor

@adixitconfluent adixitconfluent Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to have a new state. Should we have a state as TERMINATED/FENCED because a share partition should also be removed from SPM in case the topic partition is deleted or becomes a follower (https://issues.apache.org/jira/browse/KAFKA-17783). Wdyt @apoorvmittal10 @AndrewJSchofield

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think these states sound sensible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should have an active status check on all Share Partition

Yes, I agree on this as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a state machine written down somewhere for SharePartition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, not yet. Let me create the state transitions better in a single place where we can see the transitions.

}

private int leaderEpoch(TopicPartition tp) {
Either<Errors, Partition> partitionOrError = replicaManager.getPartitionOrError(tp);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked the error codes here and the KIP looks right to me.

int maxBytes,
FetchIsolation isolation,
Optional<ClientMetadata> clientMetadata,
boolean shareFetchRequest) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether this is a bit literal. Why are we supplying a ClientMetadata here in the share fetch case? That seems to me to be concerned with fetch-from-follower. If we didn't supply a ClientMetadata, then fetchOnlyLeader() would already return true without needing the new flag.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ClientMetadata includes details for rackId, listenerName which I see in regular fetch if utilized for figuring read replica. Not sure if that would be relevant for ShareFetch in future and if we should skip always for ShareFetch. Hence I have added the additional shareFetchBoolean. Please let me know what you think. Also adding @junrao to provide his inputs.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A few more comments.

@@ -1743,7 +1750,7 @@ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatc
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches))))
topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get an error like UNKNOWN_TOPIC_OR_PARTITION or FENCED_STATE_EPOCH, should we remove the sharePartition too?

Copy link
Collaborator Author

@apoorvmittal10 apoorvmittal10 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have handled it.

@@ -113,6 +127,7 @@ public String toString() {
", maxBytes=" + maxBytes +
", isolation=" + isolation +
", clientMetadata=" + clientMetadata +
", shareFetchRequest=" + shareFetchRequest +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include the new param in hashCode and equals too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it, thanks added.

@@ -617,22 +650,35 @@ private void maybeCompleteInitializationWithException(
return;
}

// Remove the partition from the cache as it's failed to initialize.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning of this method, we check for LeaderNotAvailableException. When do we get that exception? ReadShareGroupStateResponse doesn't seem to have that error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeaderNotAvailableException is an internal exception from SharePartition to SharePartitionManager which can occur only when SharePartition is in process of initialization and not yet complete. Hence fo that period just keep the requests in purgatory.

private void handleFencedSharePartitionException(
SharePartitionKey sharePartitionKey,
Throwable throwable
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include UNKNOWN_TOPIC_OR_PARTITION below too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

if (!future.isDone()) {
Errors error = Errors.forException(throwable);
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
tp -> tp, tp -> new PartitionData().setErrorCode(error.code()).setErrorMessage(error.message()))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lose the error message in throwable when converting it to Error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -608,6 +640,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
private void maybeCompleteInitializationWithException(
SharePartitionKey sharePartitionKey,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
TopicIdPartition topicIdPartition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this since it's part of SharePartitionKey?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed.

SharePartition partition = new SharePartition(
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
leaderEpoch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShareFetchUtils.leaderEpoch can return exceptions like NOT_LEADER_OR_FOLLOWER and UNKNOWN_TOPIC_OR_PARTITION. Should we handle that at the partition level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled at partition level. But I have added a couple of TODOs in the PR to take them in follow up as this PR is getting bigger. I am of opinion that we can take the changes incrementally.

@@ -127,7 +127,7 @@ public void onComplete() {
shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, the error code is returned in responseLogResult. So we need to handle the error there too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are going with massive refactor with min bytes PR in delayed share fetch, also the handling of this error will have it's own quite possible cases hence created a jira to take that up in following PR: https://issues.apache.org/jira/browse/KAFKA-17887

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao I think the error from LogReadResult is already handled as we set the respective partition level error while parsing response in ShareFetchUtils.processFetchResponse method. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems this is no longer an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming @junrao, I have closed the ticket.

@@ -617,22 +650,35 @@ private void maybeCompleteInitializationWithException(
return;
}

// Remove the partition from the cache as it's failed to initialize.
partitionCacheMap.remove(sharePartitionKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We probably want to verify the leader epoch in SharePartition before removal to avoid a newly created SharePartition being removed by an old request.
  2. We piggyback the removal in an error response. The downside is that there are quite a few error places to handle and it is reactive. An alternative is to have a partitionLeaderEpochChange listener. We can then proactively remove a SharePartition when the leader epoch changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are great suggestions, do you think it would be right to have it in follow up PRs?

@apoorvmittal10
Copy link
Collaborator Author

@junrao I have made a lot changes and the PR is getting bigger and bigger. I am reverting some of my local changes and will write a plan with jiras so we can proceed.

/**
* The leader epoch is used to track the partition epoch.
*/
private final int leaderEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this final implies the SharePartition is now scoped to the lifetime of a partition's leader epoch. Since SPs are managed by the node which is the leader for that partition, I guess this is already the case (and not really a problem). We normally expect the leader to move when the leader epoch increases, but I'm not sure if that's always the case.

Hypothetically, if a leader epoch increased but the leader did not move, would it be possible to reuse the SharePartition state? Or would we need to re-load its state from the persister anyways?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's for share partition life time. We are marking the Share Partition fenced and un-usable if an error occurs. Which means the state should be re-loaded from the persister to function ahead.

private void handleSharePartitionException(
SharePartitionKey sharePartitionKey,
Throwable throwable
) {
if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from map and let the client retry.
// But surface the error to the client so client might take some action i.e. re-fetch
// the metadata and retry the fetch on new leader.
partitionCacheMap.remove(sharePartitionKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a state machine written down somewhere for SharePartition?

@@ -128,4 +131,13 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
}

static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an OptionalInt would be a bit nicer than throwing there (maybe). If we actually want a helper method that throws, we should incorporate that into the name (e.g., leaderEpochOrThrow)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With an optionalInt again we need to check and pass the right error. So either we have this method or create leaderEpochOrThrow in replica manager.

@apoorvmittal10
Copy link
Collaborator Author

apoorvmittal10 commented Oct 29, 2024

So here is the summary. Sorry, the PR is growing. As now it's more of error handling and less of just leader epoch propagation.

The current PR has following:

  • Fetches LeaderEpoch and passes to SharePartition. If leader epoch (fenced/state epoch), unknown topic partiton or unknown group error arrives then Share Partition will be removed from cache. At the same time the share partition will be marked fenced.
  • For same, Fenced state has been added in the PR.
  • There exists 2 checks for fenced state in SharePartition a) While acquiring records b) While acquiring fetch lock. Hence any inflight request which tries to acquire records will fail. Though even prior to that requests will start failing while acquiring lock. This prevents us to send any new records to consumers for fenced share partitions. However, I didn't add the check on acknowledge and release APIs as they will eventually fail while persisting, if should.
  • Added the error handling for leader epoch at SharePartiton level, which means if in a request of 5 topic partitions, if 3 are fenced then the request should proceed for remaining 2 share partitions rather failing completely.

Some follow ups to do:

  • https://issues.apache.org/jira/browse/KAFKA-17510 - Additional error handling and trigger fetch, once the initilization of share partition is completed and the request is in purgatory.
  • https://issues.apache.org/jira/browse/KAFKA-17887 - Error handling for response log result in delayed share fetch.
  • Better state machine transition in Share Partition.
  • Consider using listener to fence and remove Share Partition.
  • Add a check for leader epoch prior removing share partition instance.
  • Send error partitions response as well, currenlty if all partitions fail or succeed then fail/success response is sent for all else only for partitions for which data is fetched.
  • Additionally I am thinking to rename ShareFetchData to ShareFetch and provide better handling of future completion as the code is a bit fragmented today.

I am planning to have couple of follow up PRs, if @junrao @AndrewJSchofield @mumrah you think is fine. As with tests and changes, this PR is getting bigger and bigger.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A few more comments.

CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Throwable throwable
) {
topicIdPartitions.forEach(topicIdPartition -> handleFencedSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird. We actually don't know which partition causes throwable. Ideally, we should just set a top level error instead of applying it on each partition. We probably shouldn't remove the SharePartition here since we are not sure which partition to remove.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, though you are right but the problem is that ShareFetchResponse contains both Fetch and Acknowledgement response. If we set a top level error code then the acknowledgement in request will also be responded as failed however acknowledgement might have succeeded.

The reason I was removeing the SharePartition in this condition was that recreating a new SharePartition is cheap. However if you think we shouldn't remove then I will avoid that and see waht scenarios we encounter during the tests (moving partitions in a cluster).

// as this situation is not expected.
log.error("Error processing share fetch request", e);
if (erroneous == null) {
erroneous = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it's more intuitive to initialize erroneous as an empty map so that we don't need to deal with it being null.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception occuring from getOrCreateSharePartition will be rare hence I was of an opinion of not initializing the erroneous and do that only when needed.

private void completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Map<TopicIdPartition, Throwable> erroneous) {
future.complete(erroneous.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey, entry -> new PartitionData().setErrorCode(Errors.forException(entry.getValue()).code()).setErrorMessage(entry.getValue().getMessage()))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line is getting too long. Could we avoid calling entry.getValue() twice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, done.

}

private boolean stateNotActive() {
return partitionState() != SharePartitionState.ACTIVE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should throw a fenced exception and let the caller handle it. This can be done in a separate PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));

SharePartition sp1 = mock(SharePartition.class);
// Do not make the share partition acquirable hence it shouldn't be removed from the cache,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should we explicitly mock sp1.maybeAcquireFetchLock to false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.


CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why does the completed future have only 1 partition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because fetch didn't happen for other partition as it was not acquirable. Though final response from KafkaApis consolidates the topic-partitions from erroneous (from KafkaAPIs), share partition manager's topic partitions and remaining with empty data.

@apoorvmittal10
Copy link
Collaborator Author

Hi @junrao , thanks for the feedback. Addressed and replied.
There is one test which was previously marked as flaky but I can see the failure in this PR as well. I have updated the ticket for the flaky test as well.

https://issues.apache.org/jira/browse/KAFKA-17463?jql=text%20~%20%22testShareGroups%22

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. LGTM

@junrao junrao merged commit ff116df into apache:trunk Oct 30, 2024
5 of 7 checks passed
abhishekgiri23 pushed a commit to abhishekgiri23/kafka that referenced this pull request Nov 2, 2024
…P-932) (apache#16842)

The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure.

Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>, Jun Rao <[email protected]>, David Arthur <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants