diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx index b4ee9fdf4..47f0c824c 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx @@ -189,12 +189,12 @@ export function ConsumerGroupsTable({ ); case "topics": - const allTopics: { topicId: string; topicName: string }[] = []; - - row.attributes.offsets?.forEach((a) => { - allTopics.push({ topicId: a.topicId, topicName: a.topicName }); - }); - + const allTopics: { topicId: string; topicName: string; }[] = []; + row.attributes.members?. + flatMap((m) => m.assignments ?? []). + forEach(a => allTopics.push({ topicId: a.topicId, topicName: a.topicName })); + row.attributes.offsets?. + forEach(a => allTopics.push({ topicId: a.topicId, topicName: a.topicName })); return ( diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx index c86f85974..c01d0c5e3 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx @@ -115,10 +115,11 @@ export function ResetConsumerOffset({ const handleSave = async () => { - setIsLoading(true); setError(undefined); + setIsLoading(true); + try { - let offsets: Array<{ + const offsets: Array<{ topicId: string; partition?: number; offset: string | number; @@ -126,54 +127,57 @@ export function ResetConsumerOffset({ }> = []; if (selectedConsumerTopic === "allTopics") { - offsets = topics.flatMap((topic) => - partitions.map((partition) => ({ - topicId: topic.topicId, - partition: partition, - offset: selectedOffset === "custom" || selectedOffset === "specificDateTime" - ? selectDateTimeFormat === "Epoch" - ? convertEpochToISO(String(offset.offset)) - : offset.offset - : selectedOffset, - })) - ); - + topics.forEach((topic) => { + partitions.forEach((partition) => { + offsets.push({ + topicId: topic.topicId, + partition: partition, + offset: selectedOffset === "custom" || selectedOffset === "specificDateTime" + ? selectDateTimeFormat === "Epoch" + ? convertEpochToISO(String(offset.offset)) + : offset.offset + : selectedOffset, + }); + }); + }); } else if (selectedConsumerTopic === "selectedTopic") { const uniquePartitions = new Set( partitions.map(partition => selectedPartition === "allPartitions" ? partition : offset.partition) ); - offsets = Array.from(uniquePartitions).map((partition) => ({ - topicId: offset.topicId, - partition, - offset: selectedOffset === "custom" || selectedOffset === "specificDateTime" - ? selectDateTimeFormat === "Epoch" - ? convertEpochToISO(String(offset.offset)) - : offset.offset - : selectedOffset, - })); + Array.from(uniquePartitions).forEach((partition) => { + offsets.push({ + topicId: offset.topicId, + partition, + offset: selectedOffset === "custom" || selectedOffset === "specificDateTime" + ? selectDateTimeFormat === "Epoch" + ? convertEpochToISO(String(offset.offset)) + : offset.offset + : selectedOffset, + }); + }); } - offsets = offsets.filter((value, index, self) => + // Remove duplicate entries + const uniqueOffsets = offsets.filter((value, index, self) => index === self.findIndex((t) => ( t.topicId === value.topicId && t.partition === value.partition )) ); - const success = await updateConsumerGroup(kafkaId, consumerGroupName, offsets); + const success = await updateConsumerGroup(kafkaId, consumerGroupName, uniqueOffsets); if (success === true) { closeResetOffset(); } else { - setIsLoading(false); const errorMessages = (success as UpdateConsumerGroupErrorSchema)?.errors.map((err) => err.detail) || []; const errorMessage = errorMessages.length > 0 ? errorMessages[0] - : "Failed to update consumer group" - setError(errorMessage) + : "Failed to update consumer group"; + setError(errorMessage); } } catch (e: unknown) { - setError("unknown"); + setError("Unknown error occurred"); } finally { setIsLoading(false); }