Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single cluster support, with dynamic authentication #941

Merged
merged 18 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ui/api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ import { z } from "zod";

export async function getHeaders(): Promise<Record<string, string>> {
const user = await getUser();
return {
let headers: Record<string, string> = {
Accept: "application/json",
Authorization: `Bearer ${user.accessToken}`,
"Content-Type": "application/json",
};
if (user.authorization) {
headers["Authorization"] = user.authorization;
}
return headers;
}

export const ApiError = z.object({
Expand Down
59 changes: 34 additions & 25 deletions ui/api/consumerGroups/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,40 @@ export async function getConsumerGroups(
sort?: string;
sortDir?: string;
},
): Promise<ConsumerGroupsResponse> {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[consumerGroups]": params.fields ?? "state,simpleConsumerGroup,members,offsets",
// TODO: pass filter from UI
"filter[state]": "in,STABLE,PREPARING_REBALANCE,COMPLETING_REBALANCE",
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const cgQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups?${cgQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: [`consumer-groups`],
},
});
log.debug({ url }, "getConsumerGroups");
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
): Promise<ConsumerGroupsResponse | null> {
try {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[consumerGroups]":
params.fields ?? "state,simpleConsumerGroup,members,offsets",
// TODO: pass filter from UI
"filter[state]": "in,STABLE,PREPARING_REBALANCE,COMPLETING_REBALANCE",
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const cgQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups?${cgQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: [`consumer-groups`],
},
});
log.debug({ url }, "getConsumerGroups");
if (res.status === 200) {
const rawData = await res.json();
log.debug({ url, rawData }, "getConsumerGroups response");
return ConsumerGroupsResponseSchema.parse(rawData);
}
} catch (err) {
log.error(err, "getConsumerGroups");
throw new Error("getConsumerGroups: couldn't connect with backend");
}
return null;
}

export async function getTopicConsumerGroups(
Expand Down
88 changes: 54 additions & 34 deletions ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import {
import { logger } from "@/utils/logger";
import groupBy from "lodash.groupby";
import { PrometheusDriver } from "prometheus-query";
import * as cluster from "./cluster.promql";
import * as clusterPromql from "./cluster.promql";
import { values } from "./kpi.promql";
import * as topic from "./topic.promql";
import * as topicPromql from "./topic.promql";

export type ClusterMetric = keyof typeof cluster;
export type TopicMetric = keyof typeof topic;
export type ClusterMetric = keyof typeof clusterPromql;
export type TopicMetric = keyof typeof topicPromql;

const prom = process.env.CONSOLE_METRICS_PROMETHEUS_URL
? new PrometheusDriver({
Expand All @@ -37,14 +37,20 @@ export async function getKafkaClusters(): Promise<ClusterList[]> {
const url = `${process.env.BACKEND_URL}/api/kafkas?${kafkaClustersQuery}`;
try {
const res = await fetch(url, {
headers: await getHeaders(),
headers: {
Accept: "application/json",
"Content-Type": "application/json",
},
next: {
revalidate: 30,
},
});
const rawData = await res.json();
log.trace(rawData, "getKafkaClusters response");
return ClustersResponseSchema.parse(rawData).data;
} catch (err) {
log.error(err, "getKafkaClusters");
return [];
throw new Error("getKafkaClusters: couldn't connect with backend");
}
}

Expand All @@ -60,33 +66,35 @@ export async function getKafkaCluster(
try {
const res = await fetch(url, {
headers: await getHeaders(),
cache: "force-cache",
cache: "reload",
});
const rawData = await res.json();
log.trace(rawData, "getKafkaCluster response");
return ClusterResponse.parse(rawData).data;
if (res.status === 200) {
const rawData = await res.json();
log.trace(rawData, "getKafkaCluster response");
return ClusterResponse.parse(rawData).data;
}
return null;
} catch (err) {
log.error({ err, clusterId }, "getKafkaCluster");
return null;
throw new Error("getKafkaCluster: couldn't connect with backend");
}
}

export async function getKafkaClusterKpis(
clusterId: string,
): Promise<{ cluster: ClusterDetail; kpis: ClusterKpis | null } | null> {
try {
const cluster = await getKafkaCluster(clusterId);
if (!cluster?.attributes?.namespace) {
return null;
}
const cluster = await getKafkaCluster(clusterId);

log.debug({ cluster, prom }, "????");
if (!cluster) {
return null;
}

if (!prom) {
log.debug({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
return { cluster, kpis: null };
}
if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
return { cluster, kpis: null };
}

try {
const valuesRes = await prom.instantQuery(
values(
cluster.attributes.namespace,
Expand Down Expand Up @@ -199,7 +207,7 @@ export async function getKafkaClusterKpis(
};
} catch (err) {
log.error({ err, clusterId }, "getKafkaClusterKpis");
return null;
throw new Error("getKafkaClusterKpis: couldn't connect with Prometheus");
}
}

Expand All @@ -208,7 +216,7 @@ export async function getKafkaClusterMetrics(
metrics: Array<ClusterMetric>,
): Promise<{
cluster: ClusterDetail;
ranges: Record<ClusterMetric, MetricRange>;
ranges: Record<ClusterMetric, MetricRange> | null;
} | null> {
async function getRangeByNodeId(
namespace: string,
Expand All @@ -220,7 +228,7 @@ export async function getKafkaClusterMetrics(
const end = new Date();
const step = 60 * 1;
const seriesRes = await prom!.rangeQuery(
cluster[metric](namespace, name, nodePools),
clusterPromql[metric](namespace, name, nodePools),
start,
end,
step,
Expand All @@ -236,12 +244,18 @@ export async function getKafkaClusterMetrics(
return [metric, MetricRangeSchema.parse(serieByNode)];
}

try {
const cluster = await getKafkaCluster(clusterId);
if (!cluster?.attributes?.namespace || !prom) {
return null;
}
const cluster = await getKafkaCluster(clusterId);

if (!cluster) {
return null;
}

if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
return { cluster, ranges: null };
}

try {
const rangesRes = Object.fromEntries(
await Promise.all(
metrics.map((m) =>
Expand All @@ -264,7 +278,7 @@ export async function getKafkaClusterMetrics(
};
} catch (err) {
log.error({ err, clusterId, metric: metrics }, "getKafkaClusterMetric");
return null;
throw new Error("getKafkaClusterMetric: couldn't connect with Prometheus");
}
}

Expand All @@ -273,7 +287,7 @@ export async function getKafkaTopicMetrics(
metrics: Array<TopicMetric>,
): Promise<{
cluster: ClusterDetail;
ranges: Record<TopicMetric, MetricRange>;
ranges: Record<TopicMetric, MetricRange> | null;
} | null> {
async function getRangeByNodeId(
namespace: string,
Expand All @@ -285,7 +299,7 @@ export async function getKafkaTopicMetrics(
const end = new Date();
const step = 60 * 1;
const seriesRes = await prom!.rangeQuery(
topic[metric](namespace, name, nodePools),
topicPromql[metric](namespace, name, nodePools),
start,
end,
step,
Expand All @@ -303,10 +317,16 @@ export async function getKafkaTopicMetrics(

try {
const cluster = await getKafkaCluster(clusterId);
if (!cluster?.attributes?.namespace || !prom) {

if (!cluster) {
return null;
}

if (!prom || !cluster.attributes.namespace) {
log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable");
return { cluster, ranges: null };
}

const rangesRes = Object.fromEntries(
await Promise.all(
metrics.map((m) =>
Expand All @@ -329,6 +349,6 @@ export async function getKafkaTopicMetrics(
};
} catch (err) {
log.error({ err, clusterId, metric: metrics }, "getKafkaTopicMetrics");
return null;
throw new Error("getKafkaTopicMetric: couldn't connect with Prometheus");
}
}
98 changes: 65 additions & 33 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ export const ClusterListSchema = z.object({
type: z.literal("kafkas"),
meta: z.object({
configured: z.boolean(),
authentication: z
.union([
z.object({
method: z.literal("anonymous"),
}),
z.object({
method: z.literal("basic"),
}),
z.object({
method: z.literal("oauth"),
tokenUrl: z.string().nullable().optional(),
}),
])
.nullable()
.optional(),
}),
attributes: z.object({
name: z.string(),
Expand All @@ -35,22 +50,28 @@ const ClusterDetailSchema = z.object({
nodes: z.array(NodeSchema),
controller: NodeSchema,
authorizedOperations: z.array(z.string()),
listeners: z.array(
z.object({
type: z.string(),
bootstrapServers: z.string().nullable(),
authType: z.string().nullable(),
}),
).nullable().optional(),
conditions: z.array(
z.object({
type: z.string().optional(),
status: z.string().optional(),
reason: z.string().optional(),
message: z.string().optional(),
lastTransitionTime: z.string().optional(),
}),
).nullable().optional(),
listeners: z
.array(
z.object({
type: z.string(),
bootstrapServers: z.string().nullable(),
authType: z.string().nullable(),
}),
)
.nullable()
.optional(),
conditions: z
.array(
z.object({
type: z.string().optional(),
status: z.string().optional(),
reason: z.string().optional(),
message: z.string().optional(),
lastTransitionTime: z.string().optional(),
}),
)
.nullable()
.optional(),
nodePools: z.array(z.string()).optional().nullable(),
}),
});
Expand All @@ -64,24 +85,35 @@ export const ClusterKpisSchema = z.object({
total_topics: z.number().optional(),
total_partitions: z.number().optional(),
underreplicated_topics: z.number().optional(),
replica_count: z.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
}).optional(),
leader_count: z.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
}).optional(),
volume_stats_capacity_bytes: z.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
}).optional(),
volume_stats_used_bytes: z.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
}).optional(),
replica_count: z
.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
})
.optional(),
leader_count: z
.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
})
.optional(),
volume_stats_capacity_bytes: z
.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
})
.optional(),
volume_stats_used_bytes: z
.object({
byNode: z.record(z.number()).optional(),
total: z.number().optional(),
})
.optional(),
});
export type ClusterKpis = z.infer<typeof ClusterKpisSchema>;

export const MetricRangeSchema = z.record(z.string(), z.record(z.number()).optional());
export const MetricRangeSchema = z.record(
z.string(),
z.record(z.number()).optional(),
);
export type MetricRange = z.infer<typeof MetricRangeSchema>;
Loading