Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17002: Integrated partition leader epoch for Persister APIs (KIP-932) #16842

Merged
merged 23 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b31e4e4
KAFKA-17002: Integrated partition leader epoch for Persister APIs (KI…
apoorvmittal10 Aug 8, 2024
44d537c
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Aug 23, 2024
8030ae8
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
Aug 28, 2024
4e1598d
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 15, 2024
a0ee7a6
Build fixes
apoorvmittal10 Oct 16, 2024
4b4b226
Handling leader epoch changes
apoorvmittal10 Oct 17, 2024
d8e38c4
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 17, 2024
57f3b99
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 21, 2024
2694ee2
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 22, 2024
1103acb
Throwing and handling exception
apoorvmittal10 Oct 22, 2024
f08f14e
Additional fixes
apoorvmittal10 Oct 22, 2024
a6cb2de
Handling fetch exception
apoorvmittal10 Oct 22, 2024
c91fb93
Simplifying remove
apoorvmittal10 Oct 22, 2024
edd56a9
Correcting log line
apoorvmittal10 Oct 22, 2024
b27c805
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 22, 2024
d8bda03
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 24, 2024
24a8098
Changed method name, moved partition fetch to utils
apoorvmittal10 Oct 24, 2024
203d685
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 26, 2024
a1c3dc8
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 26, 2024
99ba4a7
Handling partition exceptions
apoorvmittal10 Oct 28, 2024
ceb5720
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 29, 2024
8e61c98
Addressing review comments
apoorvmittal10 Oct 30, 2024
f6145f8
Merge remote-tracking branch 'upstream/trunk' into KAFKA-17002
apoorvmittal10 Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void onComplete() {
shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, the error code is returned in responseLogResult. So we need to handle the error there too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are going with massive refactor with min bytes PR in delayed share fetch, also the handling of this error will have it's own quite possible cases hence created a jira to take that up in following PR: https://issues.apache.org/jira/browse/KAFKA-17887

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao I think the error from LogReadResult is already handled as we set the respective partition level error while parsing response in ShareFetchUtils.processFetchResponse method. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems this is no longer an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming @junrao, I have closed the ticket.

} finally {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ public static RecordState forId(byte id) {
*/
private final TopicIdPartition topicIdPartition;

/**
* The leader epoch is used to track the partition epoch.
*/
private final int leaderEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this final implies the SharePartition is now scoped to the lifetime of a partition's leader epoch. Since SPs are managed by the node which is the leader for that partition, I guess this is already the case (and not really a problem). We normally expect the leader to move when the leader epoch increases, but I'm not sure if that's always the case.

Hypothetically, if a leader epoch increased but the leader did not move, would it be possible to reuse the SharePartition state? Or would we need to re-load its state from the persister anyways?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's for share partition life time. We are marking the Share Partition fenced and un-usable if an error occurs. Which means the state should be re-loaded from the persister to function ahead.


/**
* The in-flight record is used to track the state of a record that has been fetched from the
* leader. The state of the record is used to determine if the record should be re-fetched or if it
Expand Down Expand Up @@ -279,6 +284,7 @@ public static RecordState forId(byte id) {
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightMessages,
int maxDeliveryCount,
int defaultRecordLockDurationMs,
Expand All @@ -290,6 +296,7 @@ public static RecordState forId(byte id) {
) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
this.leaderEpoch = leaderEpoch;
this.maxInFlightMessages = maxInFlightMessages;
this.maxDeliveryCount = maxDeliveryCount;
this.cachedState = new ConcurrentSkipListMap<>();
Expand Down Expand Up @@ -340,7 +347,7 @@ public CompletableFuture<Void> maybeInitialize() {
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), 0)))))
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), leaderEpoch)))))
.build())
.build()
).whenComplete((result, exception) -> {
Expand Down Expand Up @@ -1666,7 +1673,7 @@ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatc
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches))))
topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get an error like UNKNOWN_TOPIC_OR_PARTITION or FENCED_STATE_EPOCH, should we remove the sharePartition too?

Copy link
Collaborator Author

@apoorvmittal10 apoorvmittal10 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have handled it.

).build()).build())
.whenComplete((result, exception) -> {
if (exception != null) {
Expand Down
96 changes: 80 additions & 16 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server.share;

import kafka.cluster.Partition;
import kafka.server.ReplicaManager;

import org.apache.kafka.clients.consumer.AcknowledgeType;
Expand Down Expand Up @@ -69,8 +70,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import scala.jdk.javaapi.CollectionConverters;
import scala.util.Either;

/**
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions.
Expand Down Expand Up @@ -260,11 +263,13 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part
this.shareGroupMetrics.shareAcknowledgement();
Map<TopicIdPartition, CompletableFuture<Errors>> futures = new HashMap<>();
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
if (sharePartition != null) {
CompletableFuture<Errors> future = new CompletableFuture<>();
sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> {
if (throwable != null) {
handleSharePartitionException(sharePartitionKey, throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, we want to add the same logic for shareFetch too.

To do this, we need to extend FetchParams such that fetchOnlyLeader() is true for share fetch and handle NotLeaderOrFollowerException accordingly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

future.complete(Errors.forException(throwable));
return;
}
Expand Down Expand Up @@ -328,14 +333,16 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part

Map<TopicIdPartition, CompletableFuture<Errors>> futuresMap = new HashMap<>();
topicIdPartitions.forEach(topicIdPartition -> {
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
if (sharePartition == null) {
log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition);
futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
} else {
CompletableFuture<Errors> future = new CompletableFuture<>();
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, throwable) -> {
if (throwable != null) {
handleSharePartitionException(sharePartitionKey, throwable);
future.complete(Errors.forException(throwable));
return;
}
Expand Down Expand Up @@ -479,6 +486,30 @@ public void acknowledgeSessionUpdate(String groupId, ShareRequestMetadata reqMet
}
}

/**
* The handleFetchException method is used to handle the exception that occurred while reading from log.
* The method will handle the exception for each topic-partition in the request. The share partition
* might get removed from the cache.
* <p>
* The replica read request might error out for one share partition
* but as we cannot determine which share partition errored out, we might remove all the share partitions
* in the request.
*
* @param groupId The group id in the share fetch request.
* @param topicIdPartitions The topic-partitions in the replica read request.
* @param future The future to complete with the exception.
* @param throwable The exception that occurred while fetching messages.
*/
public void handleFetchException(
String groupId,
Set<TopicIdPartition> topicIdPartitions,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Throwable throwable
) {
topicIdPartitions.forEach(topicIdPartition -> handleSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable));
maybeCompleteShareFetchExceptionally(future, topicIdPartitions, throwable);
}

/**
* The cachedTopicIdPartitionsInShareSession method is used to get the cached topic-partitions in the share session.
*
Expand Down Expand Up @@ -543,7 +574,7 @@ void processShareFetch(ShareFetchData shareFetchData) {
// TopicPartitionData list will be populated only if the share partition is already initialized.
sharePartition.maybeInitialize().whenComplete((result, throwable) -> {
if (throwable != null) {
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable);
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), topicIdPartition, throwable);
}
});
});
Expand All @@ -565,21 +596,24 @@ void processShareFetch(ShareFetchData shareFetchData) {
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this),
delayedShareFetchWatchKeys);
} catch (Exception e) {
// In case exception occurs then release the locks so queue can be further processed.
log.error("Error processing fetch queue for share partitions", e);
if (!shareFetchData.future().isDone()) {
shareFetchData.future().completeExceptionally(e);
}
// Complete the whole fetch request with an exception if there is an error processing.
// The exception currently can be thrown only if there is an error while initializing
// the share partition. But skip the processing for other share partitions in the request
// as this situation is not expected.
log.error("Error processing share fetch request", e);
maybeCompleteShareFetchExceptionally(shareFetchData.future(), shareFetchData.partitionMaxBytes().keySet(), e);
}
}

private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {
return partitionCacheMap.computeIfAbsent(sharePartitionKey,
k -> {
long start = time.hiResClockMs();
int leaderEpoch = leaderEpoch(sharePartitionKey.topicIdPartition().topicPartition());
SharePartition partition = new SharePartition(
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
leaderEpoch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShareFetchUtils.leaderEpoch can return exceptions like NOT_LEADER_OR_FOLLOWER and UNKNOWN_TOPIC_OR_PARTITION. Should we handle that at the partition level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled at partition level. But I have added a couple of TODOs in the PR to take them in follow up as this PR is getting bigger. I am of opinion that we can take the changes incrementally.

maxInFlightMessages,
maxDeliveryCount,
defaultRecordLockDurationMs,
Expand All @@ -597,6 +631,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
private void maybeCompleteInitializationWithException(
SharePartitionKey sharePartitionKey,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
TopicIdPartition topicIdPartition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this since it's part of SharePartitionKey?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed.

Throwable throwable) {
if (throwable instanceof LeaderNotAvailableException) {
log.debug("The share partition with key {} is not initialized yet", sharePartitionKey);
Expand All @@ -606,22 +641,51 @@ private void maybeCompleteInitializationWithException(
return;
}

// Remove the partition from the cache as it's failed to initialize.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning of this method, we check for LeaderNotAvailableException. When do we get that exception? ReadShareGroupStateResponse doesn't seem to have that error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeaderNotAvailableException is an internal exception from SharePartition to SharePartitionManager which can occur only when SharePartition is in process of initialization and not yet complete. Hence fo that period just keep the requests in purgatory.

partitionCacheMap.remove(sharePartitionKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We probably want to verify the leader epoch in SharePartition before removal to avoid a newly created SharePartition being removed by an old request.
  2. We piggyback the removal in an error response. The downside is that there are quite a few error places to handle and it is reactive. An alternative is to have a partitionLeaderEpochChange listener. We can then proactively remove a SharePartition when the leader epoch changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are great suggestions, do you think it would be right to have it in follow up PRs?

// The partition initialization failed, so complete the request with the exception.
// The server should not be in this state, so log the error on broker and surface the same
// to the client. The broker should not be in this state, investigate the root cause of the error.
log.error("Error initializing share partition with key {}", sharePartitionKey, throwable);
maybeCompleteShareFetchExceptionally(future, Collections.singletonList(topicIdPartition), throwable);
}

private void handleSharePartitionException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest "fenced" should appear in the method name. This is essentially seeing if the exception indicates fencing and then discarding the share partition from the cache.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to have general exception handling method as we might find other exceptions which needs some handling as well. Do you think we should strictly have fenced special handling method?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that you're really only handling a couple of exceptions at the moment, and they're both fencing-related. Anyway, just a suggestion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have changed the method name. Might move to generic one if required in future.

SharePartitionKey sharePartitionKey,
Throwable throwable
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include UNKNOWN_TOPIC_OR_PARTITION below too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) {
log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from map and let the client retry.
// But surface the error to the client so client might take some action i.e. re-fetch
// the metadata and retry the fetch on new leader.
partitionCacheMap.remove(sharePartitionKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the SharePartition need to be put into a fenced state? Removing it from the cache is good for future requests, but is that enough for the object which is already in existence?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we remove share partition from cache at 2 places. 1) When initialization failed 2) When fenced error occurs.
For 1, it's safe as its still in initilization state.

For 2, I was in mixed opinion. As all interactions with share partition happens currently while fetching instance from cache hence once removed or re-initialized the new state should appear. But if old share partition instance is already held by some other thread then acknowledge will anyways fail but fetch can succeed. Do you think it would be sensible to have another state in SharePartition as Fenced, which once set then fetch lock on that share partition cannot be attained. Do you think we should have an active status check on all Share Partition APIs as well?
cc: @adixitconfluent

Copy link
Contributor

@adixitconfluent adixitconfluent Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to have a new state. Should we have a state as TERMINATED/FENCED because a share partition should also be removed from SPM in case the topic partition is deleted or becomes a follower (https://issues.apache.org/jira/browse/KAFKA-17783). Wdyt @apoorvmittal10 @AndrewJSchofield

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think these states sound sensible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should have an active status check on all Share Partition

Yes, I agree on this as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a state machine written down somewhere for SharePartition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, not yet. Let me create the state transitions better in a single place where we can see the transitions.

future.completeExceptionally(throwable);
return;
}
}

// The partition initialization failed, so complete the request with the exception.
// The server should not be in this state, so log the error on broker and surface the same
// to the client. As of now this state is in-recoverable for the broker, and we should
// investigate the root cause of the error.
log.error("Error initializing share partition with key {}", sharePartitionKey, throwable);
future.completeExceptionally(throwable);
private void maybeCompleteShareFetchExceptionally(CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) {
if (!future.isDone()) {
Errors error = Errors.forException(throwable);
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
tp -> tp, tp -> new PartitionData().setErrorCode(error.code()).setErrorMessage(error.message()))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lose the error message in throwable when converting it to Error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

private int leaderEpoch(TopicPartition tp) {
Either<Errors, Partition> partitionOrError = replicaManager.getPartitionOrError(tp);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked the error codes here and the KIP looks right to me.

if (partitionOrError.isLeft()) {
log.debug("Failed to get partition leader for topic partition: {}-{} due to error: {}",
tp.topic(), tp.partition(), partitionOrError.left().get());
throw partitionOrError.left().get().exception();
}

Partition partition = partitionOrError.right().get();
if (!partition.isLeader()) {
log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition());
throw new NotLeaderOrFollowerException();
}
return partition.getLeaderEpoch();
}

private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4249,7 +4249,8 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchMinBytes,
fetchMaxBytes,
FetchIsolation.HIGH_WATERMARK,
clientMetadata
clientMetadata,
true
)

// call the share partition manager to fetch messages from the local replica.
Expand Down
Loading