Skip to content

Commit

Permalink
feat: kafka Rebalance list (#1070)
Browse files Browse the repository at this point in the history
* Rebalance list

Signed-off-by: hemahg <[email protected]>

* Add refresh modal

Signed-off-by: hemahg <[email protected]>

* Incorporate review comments

Signed-off-by: hemahg <[email protected]>

* make mode parameter capital

* update kebab menu enable/disable

Signed-off-by: hemahg <[email protected]>

---------

Signed-off-by: hemahg <[email protected]>
  • Loading branch information
hemahg authored Oct 7, 2024
1 parent eeb757f commit 570345f
Show file tree
Hide file tree
Showing 26 changed files with 1,796 additions and 10 deletions.
2 changes: 1 addition & 1 deletion ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function getKafkaCluster(
): Promise<ClusterDetail | null> {
const sp = new URLSearchParams({
"fields[kafkas]":
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools",
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools,cruiseControlEnabled",
});
const kafkaClusterQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`;
Expand Down
1 change: 1 addition & 0 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const ClusterDetailSchema = z.object({
nodes: z.array(NodeSchema),
controller: NodeSchema,
authorizedOperations: z.array(z.string()),
cruiseControlEnabled: z.boolean().optional(),
listeners: z
.array(
z.object({
Expand Down
92 changes: 92 additions & 0 deletions ui/api/rebalance/actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"use server";
import { logger } from "@/utils/logger";
import {
RebalanceResponse,
RebalanceResponseSchema,
RebalanceSchema,
RebalancesResponse,
RebalanceStatus,
} from "./schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { getHeaders } from "@/api/api";
import { RebalanceMode } from "./schema";

const log = logger.child({ module: "rebalance-api" });

export async function getRebalancesList(
kafkaId: string,
params: {
name?: string;
mode?: RebalanceMode[];
status?: RebalanceStatus[];
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
},
): Promise<RebalancesResponse> {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[kafkaRebalances]":
"name,namespace,creationTimestamp,status,mode,brokers,optimizationResult",
"filter[name]": params.name ? `like,*${params.name}*` : undefined,
"filter[status]":
params.status && params.status.length > 0
? `in,${params.status.join(",")}`
: undefined,
"filter[mode]":
params.mode && params.mode.length > 0
? `in,${params.mode.join(",")}`
: undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const rebalanceQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances?${rebalanceQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: ["rebalances"],
},
});

log.debug({ url }, "getRebalanceList");
const rawData = await res.json();
log.trace({ url, rawData }, "getRebalanceList response");
return RebalanceResponseSchema.parse(rawData);
}

export async function getRebalanceDetails(
kafkaId: string,
rebalanceId: string,
action?: string,
): Promise<RebalanceResponse | boolean> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances/${rebalanceId}`;
const decodedRebalanceId = decodeURIComponent(rebalanceId);
const body = {
data: {
type: "kafkaRebalances",
id: decodedRebalanceId,
meta: {
action: action,
},
attributes: {},
},
};
log.debug({ url }, "Fetching rebalance details");
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});
if (action) {
return res.ok;
} else {
const rawData = await res.json();
return RebalanceSchema.parse(rawData.data);
}
}
102 changes: 102 additions & 0 deletions ui/api/rebalance/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { z } from "zod";

const RebalanceStatusSchema = z.union([
z.literal("New"),
z.literal("PendingProposal"),
z.literal("ProposalReady"),
z.literal("Rebalancing"),
z.literal("Stopped"),
z.literal("NotReady"),
z.literal("Ready"),
z.literal("ReconciliationPaused"),
]);

const ModeSchema = z.union([
z.literal("full"),
z.literal("add-brokers"),
z.literal("remove-brokers"),
]);

const OptimizationResultSchema = z.object({
numIntraBrokerReplicaMovements: z.number().optional(),
numReplicaMovements: z.number().optional(),
onDemandBalancednessScoreAfter: z.number().optional(),
afterBeforeLoadConfigMap: z.string().optional(),
intraBrokerDataToMoveMB: z.number().optional(),
monitoredPartitionsPercentage: z.number().optional(),
provisionRecommendation: z.string().optional(),
excludedBrokersForReplicaMove: z.array(z.string()).nullable().optional(),
excludedBrokersForLeadership: z.array(z.string()).nullable().optional(),
provisionStatus: z.string().optional(),
onDemandBalancednessScoreBefore: z.number().optional(),
recentWindows: z.number().optional(),
dataToMoveMB: z.number().optional(),
excludedTopics: z.array(z.string()).nullable().optional(),
numLeaderMovements: z.number().optional(),
});

export const RebalanceSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z
.object({
autoApproval: z.boolean().optional(),
allowedActions: z.array(z.string()),
})
.optional(),
attributes: z.object({
name: z.string(),
namespace: z.string(),
creationTimestamp: z.string(),
status: RebalanceStatusSchema,
mode: ModeSchema,
brokers: z.array(z.number()).nullable(),
sessionId: z.string().nullable(),
optimizationResult: OptimizationResultSchema,
}),
});

const RebalancesListSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z.object({
page: z.object({
cursor: z.string(),
}),
autoApproval: z.boolean(),
managed: z.boolean().optional(),
allowedActions: z.array(z.string()),
}),
attributes: RebalanceSchema.shape.attributes.pick({
name: true,
status: true,
creationTimestamp: true,
mode: true,
brokers: true,
optimizationResult: true
}),
});

export const RebalanceResponseSchema = z.object({
meta: z.object({
page: z.object({
total: z.number(),
pageNumber: z.number().optional(),
}),
}),
links: z.object({
first: z.string().nullable(),
prev: z.string().nullable(),
next: z.string().nullable(),
last: z.string().nullable(),
}),
data: z.array(RebalancesListSchema),
});
export type RebalanceList = z.infer<typeof RebalancesListSchema>;
export type RebalancesResponse = z.infer<typeof RebalanceResponseSchema>;

export type RebalanceResponse = z.infer<typeof RebalanceSchema>;

export type RebalanceStatus = z.infer<typeof RebalanceStatusSchema>;

export type RebalanceMode = z.infer<typeof ModeSchema>;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BreadcrumbItem } from "@/libs/patternfly/react-core";

export default function TopicsActiveBreadcrumb() {
export default function NodesActiveBreadcrumb() {
return <BreadcrumbItem showDivider={true}>Brokers</BreadcrumbItem>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { BreadcrumbItem } from "@/libs/patternfly/react-core";

export default function RebalanceActiveBreadcrumb() {
return <BreadcrumbItem showDivider={true}>Brokers</BreadcrumbItem>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,49 @@ import { getKafkaCluster } from "@/api/kafka/actions";
import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params";
import { AppHeader } from "@/components/AppHeader";
import { Number } from "@/components/Format/Number";
import { Label, Spinner, Split, SplitItem } from "@/libs/patternfly/react-core";
import { NavItemLink } from "@/components/Navigation/NavItemLink";
import {
Label,
Nav,
NavList,
PageNavigation,
Spinner,
Split,
SplitItem,
} from "@/libs/patternfly/react-core";
import { CheckCircleIcon } from "@/libs/patternfly/react-icons";
import { Suspense } from "react";

export default function NodesHeader({ params }: { params: KafkaParams }) {
return (
<Suspense fallback={<Header />}>
<Suspense
fallback={<Header kafkaId={undefined} cruiseControlEnable={false} />}
>
<ConnectedHeader params={params} />
</Suspense>
);
}

async function ConnectedHeader({ params }: { params: KafkaParams }) {
const cluster = await getKafkaCluster(params.kafkaId);
return <Header total={cluster?.attributes.nodes.length || 0} />;
return (
<Header
total={cluster?.attributes.nodes.length || 0}
kafkaId={cluster?.id}
cruiseControlEnable={cluster?.attributes.cruiseControlEnabled || false}
/>
);
}

function Header({ total }: { total?: number }) {
function Header({
total,
kafkaId,
cruiseControlEnable,
}: {
total?: number;
kafkaId: string | undefined;
cruiseControlEnable: boolean;
}) {
return (
<AppHeader
title={
Expand All @@ -41,6 +66,22 @@ function Header({ total }: { total?: number }) {
</SplitItem>
</Split>
}
navigation={
<PageNavigation>
<Nav aria-label="Node navigation" variant="tertiary">
<NavList>
<NavItemLink url={`/kafka/${kafkaId}/nodes`}>
Overview
</NavItemLink>
{cruiseControlEnable && (
<NavItemLink url={`/kafka/${kafkaId}/nodes/rebalances`}>
Rebalance
</NavItemLink>
)}
</NavList>
</Nav>
</PageNavigation>
}
/>
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { getKafkaCluster } from "@/api/kafka/actions";
import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params";
import { AppHeader } from "@/components/AppHeader";
import { Number } from "@/components/Format/Number";
import { NavItemLink } from "@/components/Navigation/NavItemLink";
import {
Label,
Nav,
NavList,
PageNavigation,
Spinner,
Split,
SplitItem,
} from "@/libs/patternfly/react-core";
import { CheckCircleIcon } from "@/libs/patternfly/react-icons";
import { Suspense } from "react";

export default function NodesHeader({ params }: { params: KafkaParams }) {
return (
<Suspense
fallback={<Header kafkaId={undefined} cruiseControlEnable={false} />}
>
<ConnectedHeader params={params} />
</Suspense>
);
}

async function ConnectedHeader({ params }: { params: KafkaParams }) {
const cluster = await getKafkaCluster(params.kafkaId);
return (
<Header
total={cluster?.attributes.nodes.length || 0}
kafkaId={cluster?.id}
cruiseControlEnable={cluster?.attributes.cruiseControlEnabled || false}
/>
);
}

function Header({
total,
kafkaId,
cruiseControlEnable,
}: {
total?: number;
kafkaId: string | undefined;
cruiseControlEnable: boolean;
}) {
return (
<AppHeader
title={
<Split hasGutter={true}>
<SplitItem>Brokers</SplitItem>
<SplitItem>
<Label
color={"green"}
icon={
total === undefined ? (
<Spinner size={"sm"} />
) : (
<CheckCircleIcon />
)
}
>
{total && <Number value={total} />}
</Label>
</SplitItem>
</Split>
}
navigation={
<PageNavigation>
<Nav aria-label="Node navigation" variant="tertiary">
<NavList>
<NavItemLink url={`/kafka/${kafkaId}/nodes`}>
Overview
</NavItemLink>
{cruiseControlEnable && (
<NavItemLink url={`/kafka/${kafkaId}/nodes/rebalances`}>
Rebalance
</NavItemLink>
)}
</NavList>
</Nav>
</PageNavigation>
}
/>
);
}
Loading

0 comments on commit 570345f

Please sign in to comment.