Skip to content

Commit

Permalink
Incorporate review comment
Browse files Browse the repository at this point in the history
Signed-off-by: hemahg <[email protected]>
  • Loading branch information
hemahg committed Sep 5, 2024
1 parent 8d4767a commit b67171e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ export function ConsumerGroupsTable({
</Td>
);
case "topics":
const allTopics: { topicId: string; topicName: string }[] = [];

row.attributes.offsets?.forEach((a) => {
allTopics.push({ topicId: a.topicId, topicName: a.topicName });
});

const allTopics: { topicId: string; topicName: string; }[] = [];
row.attributes.members?.
flatMap((m) => m.assignments ?? []).
forEach(a => allTopics.push({ topicId: a.topicId, topicName: a.topicName }));
row.attributes.offsets?.
forEach(a => allTopics.push({ topicId: a.topicId, topicName: a.topicName }));
return (
<Td key={key} dataLabel={t("ConsumerGroupsTable.topics")}>
<LabelGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,65 +115,69 @@ export function ResetConsumerOffset({


const handleSave = async () => {
setIsLoading(true);
setError(undefined);
setIsLoading(true);

try {
let offsets: Array<{
const offsets: Array<{
topicId: string;
partition?: number;
offset: string | number;
metadata?: string;
}> = [];

if (selectedConsumerTopic === "allTopics") {
offsets = topics.flatMap((topic) =>
partitions.map((partition) => ({
topicId: topic.topicId,
partition: partition,
offset: selectedOffset === "custom" || selectedOffset === "specificDateTime"
? selectDateTimeFormat === "Epoch"
? convertEpochToISO(String(offset.offset))
: offset.offset
: selectedOffset,
}))
);

topics.forEach((topic) => {
partitions.forEach((partition) => {
offsets.push({
topicId: topic.topicId,
partition: partition,
offset: selectedOffset === "custom" || selectedOffset === "specificDateTime"
? selectDateTimeFormat === "Epoch"
? convertEpochToISO(String(offset.offset))
: offset.offset
: selectedOffset,
});
});
});
} else if (selectedConsumerTopic === "selectedTopic") {
const uniquePartitions = new Set(
partitions.map(partition => selectedPartition === "allPartitions" ? partition : offset.partition)
);

offsets = Array.from(uniquePartitions).map((partition) => ({
topicId: offset.topicId,
partition,
offset: selectedOffset === "custom" || selectedOffset === "specificDateTime"
? selectDateTimeFormat === "Epoch"
? convertEpochToISO(String(offset.offset))
: offset.offset
: selectedOffset,
}));
Array.from(uniquePartitions).forEach((partition) => {
offsets.push({
topicId: offset.topicId,
partition,
offset: selectedOffset === "custom" || selectedOffset === "specificDateTime"
? selectDateTimeFormat === "Epoch"
? convertEpochToISO(String(offset.offset))
: offset.offset
: selectedOffset,
});
});
}

offsets = offsets.filter((value, index, self) =>
// Remove duplicate entries
const uniqueOffsets = offsets.filter((value, index, self) =>
index === self.findIndex((t) => (
t.topicId === value.topicId && t.partition === value.partition
))
);

const success = await updateConsumerGroup(kafkaId, consumerGroupName, offsets);
const success = await updateConsumerGroup(kafkaId, consumerGroupName, uniqueOffsets);

if (success === true) {
closeResetOffset();
} else {
setIsLoading(false);
const errorMessages = (success as UpdateConsumerGroupErrorSchema)?.errors.map((err) => err.detail) || [];
const errorMessage = errorMessages.length > 0
? errorMessages[0]
: "Failed to update consumer group"
setError(errorMessage)
: "Failed to update consumer group";
setError(errorMessage);
}
} catch (e: unknown) {
setError("unknown");
setError("Unknown error occurred");
} finally {
setIsLoading(false);
}
Expand Down

0 comments on commit b67171e

Please sign in to comment.