From d670f683384a91409d470986e75246b7509b095d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 16 Apr 2025 18:51:55 +0100 Subject: [PATCH] KAFKA-19164: Keep track of groups when deleting transactional offsets When deleting pending transactional offsets, we must preserve the list of groups associated with the producer ID, otherwise we cannot clean up the list of pending transactions for the group once the transaction is committed or aborted. --- .../group/OffsetMetadataManager.java | 15 +++-- .../group/OffsetMetadataManagerTest.java | 61 +++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 2b50071a7f771..e50ec78298bac 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -198,12 +198,19 @@ public OffsetMetadataManager build() { private final TimelineHashMap> openTransactionsByGroup; private class Offsets { + /** + * Whether to preserve empty entries for groups when removing offsets. + * We use this to keep track of the groups associated with pending transactions. + */ + private final boolean preserveGroups; + /** * The offsets keyed by group id, topic name and partition id. */ private final TimelineHashMap>> offsetsByGroup; - private Offsets() { + private Offsets(boolean preserveGroups) { + this.preserveGroups = preserveGroups; this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); } @@ -256,7 +263,7 @@ private OffsetAndMetadata remove( if (partitionOffsets.isEmpty()) topicOffsets.remove(topic); - if (topicOffsets.isEmpty()) + if (!preserveGroups && topicOffsets.isEmpty()) offsetsByGroup.remove(groupId); return removedValue; @@ -278,7 +285,7 @@ private OffsetAndMetadata remove( this.groupMetadataManager = groupMetadataManager; this.config = config; this.metrics = metrics; - this.offsets = new Offsets(); + this.offsets = new Offsets(false); this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0); this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); } @@ -995,7 +1002,7 @@ public void replay( // offsets store. Pending offsets there are moved to the main store when // the transaction is committed; or removed when the transaction is aborted. pendingTransactionalOffsets - .computeIfAbsent(producerId, __ -> new Offsets()) + .computeIfAbsent(producerId, __ -> new Offsets(true)) .put( groupId, topic, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 6f788d84fd009..65d8f5514dec1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -2593,6 +2593,67 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() { assertEquals(List.of(), records); } + @Test + public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .withOffsetsRetentionMinutes(1) + .build(); + + long commitTimestamp = context.time.milliseconds(); + + context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); + context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.of( + new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); + when(group.isSubscribedToTopic("foo")).thenReturn(false); + + // Delete the pending transactional offset. + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List.of( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("foo") + .setPartitions(List.of( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1) + )) + ).iterator()); + CoordinatorResult result = context.deleteOffsets( + new OffsetDeleteRequestData() + .setGroupId("group-id") + .setTopics(requestTopicCollection) + ); + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1) + ); + assertEquals(expectedRecords, result.records()); + + context.time.sleep(Duration.ofMinutes(1).toMillis()); + + // The group should not be deleted because it has a pending transaction. + expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0) + ); + List records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + + // Commit the ongoing transaction. + context.replayEndTransactionMarker(10L, TransactionResult.COMMIT); + + // The group should be deletable now. + context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); + context.time.sleep(Duration.ofMinutes(1).toMillis()); + + records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + } + private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse( int partition, long offset,