Skip to content

Commit

Permalink
Consumer group reset offset (#960)
Browse files Browse the repository at this point in the history
* Consumer group reset offset

Signed-off-by: hemahg <[email protected]>

* Add bootstarp url

Signed-off-by: hemahg <[email protected]>

* handle bootstrap server

Signed-off-by: hemahg <[email protected]>

* remove delete button

Signed-off-by: hemahg <[email protected]>

* Add more states to consumer group state schema

Signed-off-by: hemahg <[email protected]>

* Minor refactor

Signed-off-by: hemahg <[email protected]>

---------

Signed-off-by: hemahg <[email protected]>
  • Loading branch information
hemahg authored Sep 13, 2024
1 parent 8e3374a commit cb88f98
Show file tree
Hide file tree
Showing 27 changed files with 4,089 additions and 88 deletions.
92 changes: 90 additions & 2 deletions ui/api/consumerGroups/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
import { getHeaders } from "@/api/api";
import {
ConsumerGroup,
ConsumerGroupDryrunResponseSchema,
ConsumerGroupResponseSchema,
ConsumerGroupsResponse,
ConsumerGroupsResponseSchema,
ConsumerGroupState,
DryrunResponse,
UpdateConsumerGroupErrorSchema,
} from "@/api/consumerGroups/schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { logger } from "@/utils/logger";

const log = logger.child({ module: "topics-api" });
const log = logger.child({ module: "consumergroup-api" });

export async function getConsumerGroup(
kafkaId: string,
Expand All @@ -32,6 +36,8 @@ export async function getConsumerGroups(
kafkaId: string,
params: {
fields?: string;
id?: string;
state?: ConsumerGroupState[];
pageSize?: number;
pageCursor?: string;
sort?: string;
Expand All @@ -43,8 +49,12 @@ export async function getConsumerGroups(
filterUndefinedFromObj({
"fields[consumerGroups]":
params.fields ?? "state,simpleConsumerGroup,members,offsets",
"filter[id]": params.id ? `eq,${params.id}` : undefined,
// TODO: pass filter from UI
"filter[state]": "in,STABLE,PREPARING_REBALANCE,COMPLETING_REBALANCE",
"filter[state]":
params.state && params.state.length > 0
? `in,${params.state.join(",")}`
: undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
Expand Down Expand Up @@ -107,3 +117,81 @@ export async function getTopicConsumerGroups(
log.debug({ url, rawData }, "getTopicConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
}

export async function updateConsumerGroup(
kafkaId: string,
consumerGroupId: string,
offsets: Array<{
topicId: string;
partition?: number;
offset: string | number;
metadata?: string;
}>,
): Promise<boolean | UpdateConsumerGroupErrorSchema> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${consumerGroupId}`;
const body = {
data: {
type: "consumerGroups",
id: consumerGroupId,
attributes: {
offsets,
},
},
};

log.debug({ url, body }, "calling updateConsumerGroup");

try {
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});

log.debug({ status: res.status }, "updateConsumerGroup response");

if (res.status === 204) {
return true;
} else {
const rawData = await res.json();
return UpdateConsumerGroupErrorSchema.parse(rawData);
}
} catch (e) {
log.error(e, "updateConsumerGroup unknown error");
console.error("Unknown error occurred:", e);
return false;
}
}

export async function getDryrunResult(
kafkaId: string,
consumerGroupId: string,
offsets: Array<{
topicId: string;
partition?: number;
offset: string | number;
metadata?: string;
}>,
): Promise<DryrunResponse> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${consumerGroupId}`;
const body = {
meta: {
dryRun: true,
},
data: {
type: "consumerGroups",
id: consumerGroupId,
attributes: {
offsets,
},
},
};
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroup response");
return ConsumerGroupDryrunResponseSchema.parse(rawData).data;
}
61 changes: 60 additions & 1 deletion ui/api/consumerGroups/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ import { ApiError } from "@/api/api";
import { NodeSchema } from "@/api/kafka/schema";
import { z } from "zod";

const ConsumerGroupStateSchema = z.union([
z.literal("UNKNOWN"),
z.literal("PREPARING_REBALANCE"),
z.literal("COMPLETING_REBALANCE"),
z.literal("STABLE"),
z.literal("DEAD"),
z.literal("EMPTY"),
z.literal("ASSIGNING"),
z.literal("RECONCILING"),
]);

const OffsetAndMetadataSchema = z.object({
topicId: z.string(),
topicName: z.string(),
Expand Down Expand Up @@ -29,7 +40,7 @@ export const ConsumerGroupSchema = z.object({
type: z.literal("consumerGroups"),
attributes: z.object({
simpleConsumerGroup: z.boolean().optional(),
state: z.string().optional(),
state: ConsumerGroupStateSchema,
members: z.array(MemberDescriptionSchema).optional(),
partitionAssignor: z.string().nullable().optional(),
coordinator: NodeSchema.nullable().optional(),
Expand All @@ -38,7 +49,17 @@ export const ConsumerGroupSchema = z.object({
errors: z.array(ApiError).optional(),
}),
});

const DryrunOffsetSchema = z.object({
topicId: z.string(),
topicName: z.string(),
partition: z.number(),
offset: z.number(),
metadata: z.string(),
});

export type ConsumerGroup = z.infer<typeof ConsumerGroupSchema>;
export type ConsumerGroupState = z.infer<typeof ConsumerGroupStateSchema>;

export const ConsumerGroupsResponseSchema = z.object({
meta: z.object({
Expand All @@ -55,6 +76,34 @@ export const ConsumerGroupsResponseSchema = z.object({
}),
data: z.array(ConsumerGroupSchema),
});

export const DryrunSchema = z.object({
id: z.string(),
type: z.literal("consumerGroups"),
attributes: z.object({
state: ConsumerGroupStateSchema,
members: z.array(MemberDescriptionSchema).optional(),
offsets: z.array(DryrunOffsetSchema).optional(),
}),
});

export const UpdateConsumerGroupErrorSchema = z.object({
errors: z.array(
z.object({
id: z.string(),
status: z.string(),
code: z.string(),
title: z.string(),
detail: z.string(),
source: z
.object({
pointer: z.string().optional(),
})
.optional(),
}),
),
});

export type ConsumerGroupsResponse = z.infer<
typeof ConsumerGroupsResponseSchema
>;
Expand All @@ -65,3 +114,13 @@ export const ConsumerGroupResponseSchema = z.object({
export type ConsumerGroupResponse = z.infer<
typeof ConsumerGroupsResponseSchema
>;

export type UpdateConsumerGroupErrorSchema = z.infer<
typeof UpdateConsumerGroupErrorSchema
>;

export const ConsumerGroupDryrunResponseSchema = z.object({
data: DryrunSchema,
});

export type DryrunResponse = z.infer<typeof DryrunSchema>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"use client";

import { Button, Tooltip } from "@/libs/patternfly/react-core";
import { useTranslations } from "next-intl";
import { useRouter } from "next/navigation";

export function ConsumerGroupActionButton({
disabled,
kafkaId,
consumerGroupName,
}: {
disabled: boolean;
kafkaId: string;
consumerGroupName: string;
}) {
const t = useTranslations();
const router = useRouter();

const onClickResetOffset = () => {
router.push(
`/kafka/${kafkaId}/consumer-groups/${consumerGroupName}/reset-offset`,
);
};

return (
<Tooltip
key={"reset"}
content={
"It is possible to reset the offset only on stopped consumer groups"
}
>
<Button
isDisabled={disabled}
aria-disabled={disabled}
id={"reset"}
onClick={onClickResetOffset}
>
{t("ConsumerGroup.reset_offset")}
</Button>
</Tooltip>
// <Tooltip
// key={"delete"}
// content={"It is possible to delete only stopped consumer groups"}
// >
// <Button
// variant={"danger"}
// aria-disabled={disabled}
// isDisabled={disabled}
// >
// {t("ConsumerGroup.delete")}
// </Button>
// </Tooltip>,
);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { getConsumerGroup } from "@/api/consumerGroups/actions";
import { KafkaConsumerGroupMembersParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/KafkaConsumerGroupMembers.params";
import { AppHeader } from "@/components/AppHeader";
import { Button, Tooltip } from "@/libs/patternfly/react-core";
import { Suspense } from "react";
import { useTranslations } from "next-intl";
import { ConsumerGroupActionButton } from "./ConsumerGroupActionButton";

export const fetchCache = "force-cache";

Expand Down Expand Up @@ -44,28 +44,12 @@ function Header({
<AppHeader
title={decodeURIComponent(groupId) === "+" ? <i>Empty Name</i> : groupId}
actions={[
<Tooltip
key={"reset"}
content={
"It is possible to reset the offset only on stopped consumer groups"
}
>
<Button isDisabled={disabled} aria-disabled={disabled} id={"reset"}>
{t("ConsumerGroup.reset_offset")}
</Button>
</Tooltip>,
<Tooltip
key={"delete"}
content={"It is possible to delete only stopped consumer groups"}
>
<Button
variant={"danger"}
aria-disabled={disabled}
isDisabled={disabled}
>
{t("ConsumerGroup.delete")}
</Button>
</Tooltip>,
<ConsumerGroupActionButton
key={"consumergGroupActionButton"}
disabled={disabled}
kafkaId={kafkaId}
consumerGroupName={groupId}
/>,
]}
/>
);
Expand Down
Loading

0 comments on commit cb88f98

Please sign in to comment.