Skip to content

Commit

Permalink
Fix messages not refreshing (#489)
Browse files Browse the repository at this point in the history
* Fix messages not refreshing

* Explicitly filter null and undefined values to avoid filtering valid values like 0 (zero)

* Reduce the amount of messages per page users can select
  • Loading branch information
riccardo-forina authored Feb 16, 2024
1 parent b12cd1e commit fc0b459
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"use client";

import { Message } from "@/api/messages/schema";
import { RefreshInterval, RefreshSelector } from "@/components/RefreshSelector";
import { ConnectedRefreshSelector } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/ConnectedRefreshSelector";
import { RefreshInterval } from "@/components/RefreshSelector";
import { useFilterParams } from "@/utils/useFilterParams";
import { useEffect, useLayoutEffect, useState, useTransition } from "react";
import { createPortal } from "react-dom";
import { useCallback, useEffect, useState, useTransition } from "react";
import { MessagesTable } from "./_components/MessagesTable";

export function ConnectedMessagesTable({
messages,
lastRefresh,
messages: initialData,
lastRefresh: initialRefresh,
selectedMessage,
partitions,
params,
refresh,
}: {
messages: Message[];
lastRefresh: Date | undefined;
Expand All @@ -26,11 +27,15 @@ export function ConnectedMessagesTable({
"filter[epoch]": number | undefined;
limit: number;
};
refresh: () => Promise<{ messages: Message[]; ts: Date }>;
}) {
const [{ messages, lastRefresh }, setMessages] = useState({
messages: initialData,
lastRefresh: initialRefresh,
});
const [automaticRefresh, setAutomaticRefresh] = useState(false);
const [refreshInterval, setRefreshInterval] = useState<RefreshInterval>(1);
const [isPending, startTransition] = useTransition();
const [isAutorefreshing, startAutorefreshTransition] = useTransition();
const updateUrl = useFilterParams(params);

function setPartition(partition: number | undefined) {
Expand Down Expand Up @@ -113,25 +118,29 @@ export function ConnectedMessagesTable({
});
}

function refresh() {
startTransition(() => {
updateUrl({ ...params, _ts: Date.now() });
});
}
const doRefresh = useCallback(
async function doRefresh() {
const { messages, ts } = await refresh();
startTransition(() => {
setMessages({ messages, lastRefresh: ts });
});
},
[refresh],
);

useEffect(() => {
let interval: ReturnType<typeof setInterval>;
if (automaticRefresh) {
interval = setInterval(async () => {
if (!isAutorefreshing) {
startAutorefreshTransition(() =>
updateUrl({ ...params, _ts: Date.now() }),
);
}
}, refreshInterval * 1000);
let t: ReturnType<typeof setTimeout>;

async function tick() {
if (automaticRefresh) {
await doRefresh();
}
t = setTimeout(tick, refreshInterval * 1000);
}
return () => clearInterval(interval);
}, [params, updateUrl, automaticRefresh, refreshInterval, isAutorefreshing]);

void tick();
return () => clearTimeout(t);
}, [params, updateUrl, automaticRefresh, refreshInterval, doRefresh]);

return (
<>
Expand All @@ -155,61 +164,16 @@ export function ConnectedMessagesTable({
onSelectMessage={setSelected}
onDeselectMessage={deselectMessage}
/>

<ConnectedRefreshSelector
isRefreshing={isPending}
isLive={automaticRefresh}
refreshInterval={refreshInterval}
lastRefresh={lastRefresh}
onRefresh={refresh}
onRefresh={doRefresh}
onRefreshInterval={setRefreshInterval}
onToggleLive={setAutomaticRefresh}
/>
</>
);
}

function ConnectedRefreshSelector({
isRefreshing,
isLive,
refreshInterval,
lastRefresh,
onRefresh,
onRefreshInterval,
onToggleLive,
}: {
isRefreshing: boolean;
isLive: boolean;
refreshInterval: RefreshInterval;
lastRefresh: Date | undefined;
onRefresh: () => void;
onRefreshInterval: (interval: RefreshInterval) => void;
onToggleLive: (enable: boolean) => void;
}) {
const [container, setContainer] = useState<Element | undefined>();

function seekContainer() {
const el = document.getElementById("topic-header-portal");
if (el) {
setContainer(el);
} else {
setTimeout(seekContainer, 100);
}
}

useLayoutEffect(seekContainer, []); // we want to run this just once

return container
? createPortal(
<RefreshSelector
isRefreshing={isRefreshing}
isLive={isLive}
refreshInterval={refreshInterval}
lastRefresh={lastRefresh}
onRefresh={onRefresh}
onToggleLive={onToggleLive}
onIntervalChange={onRefreshInterval}
/>,
container,
)
: null;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { RefreshInterval, RefreshSelector } from "@/components/RefreshSelector";
import { useLayoutEffect, useState } from "react";
import { createPortal } from "react-dom";

export function ConnectedRefreshSelector({
isRefreshing,
isLive,
refreshInterval,
lastRefresh,
onRefresh,
onRefreshInterval,
onToggleLive,
}: {
isRefreshing: boolean;
isLive: boolean;
refreshInterval: RefreshInterval;
lastRefresh: Date | undefined;
onRefresh: () => void;
onRefreshInterval: (interval: RefreshInterval) => void;
onToggleLive: (enable: boolean) => void;
}) {
const [container, setContainer] = useState<Element | undefined>();

function seekContainer() {
const el = document.getElementById("topic-header-portal");
if (el) {
setContainer(el);
} else {
setTimeout(seekContainer, 100);
}
}

useLayoutEffect(seekContainer, []); // we want to run this just once

return container
? createPortal(
<RefreshSelector
isRefreshing={isRefreshing}
isLive={isLive}
refreshInterval={refreshInterval}
lastRefresh={lastRefresh}
onRefresh={onRefresh}
onToggleLive={onToggleLive}
onIntervalChange={onRefreshInterval}
/>,
container,
)
: null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export function LimitSelector({
</MenuToggle>
)}
>
{[20, 50, 100, 200, 500, 1000].map((value, idx) => (
{[5, 10, 25, 50, 75, 100].map((value, idx) => (
<SelectOption key={idx} value={value} onClick={() => onChange(value)}>
{value}
</SelectOption>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ export function MessagesTable({
>
<OuterScrollContainer>
<MessagesTableToolbar
isRefreshing={isRefreshing}
partitions={partitions}
partition={partition}
limit={limit}
Expand Down Expand Up @@ -335,7 +334,6 @@ export function MessagesTable({
}

export function MessagesTableToolbar({
isRefreshing,
filterEpoch,
filterTimestamp,
filterOffset,
Expand All @@ -351,7 +349,6 @@ export function MessagesTableToolbar({
onColumnManagement,
}: Pick<
MessageBrowserProps,
| "isRefreshing"
| "filterEpoch"
| "filterTimestamp"
| "filterOffset"
Expand Down Expand Up @@ -384,7 +381,7 @@ export function MessagesTableToolbar({
}}
>
<FilterGroup
isDisabled={isRefreshing}
isDisabled={false}
offset={filterOffset}
epoch={filterEpoch}
timestamp={filterTimestamp}
Expand All @@ -404,7 +401,7 @@ export function MessagesTableToolbar({
value={partition}
partitions={partitions}
onChange={onPartitionChange}
isDisabled={isRefreshing}
isDisabled={false}
/>
</ToolbarItem>

Expand All @@ -418,7 +415,7 @@ export function MessagesTableToolbar({
>
<ToolbarItem>
<FilterGroup
isDisabled={isRefreshing}
isDisabled={false}
offset={filterOffset}
epoch={filterEpoch}
timestamp={filterTimestamp}
Expand All @@ -433,7 +430,7 @@ export function MessagesTableToolbar({
value={partition}
partitions={partitions}
onChange={onPartitionChange}
isDisabled={isRefreshing}
isDisabled={false}
/>
</ToolbarItem>
</ToolbarToggleGroup>
Expand All @@ -448,7 +445,7 @@ export function MessagesTableToolbar({
<LimitSelector
value={limit}
onChange={onLimitChange}
isDisabled={isRefreshing}
isDisabled={false}
/>
</ToolbarGroup>
<ToolbarGroup variant="icon-button-group">
Expand Down Expand Up @@ -477,7 +474,6 @@ export function MessagesTableSkeleton({
aria-label={t("title")}
>
<MessagesTableToolbar
isRefreshing={true}
partitions={1}
partition={partition}
limit={limit}
Expand Down
54 changes: 16 additions & 38 deletions ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/page.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getTopicMessage, getTopicMessages } from "@/api/messages/actions";
import { Message } from "@/api/messages/schema";
import { getTopic } from "@/api/topics/actions";
import { MessagesTableSkeleton } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessagesTable";
import { NoDataEmptyState } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/NoDataEmptyState";
import { ConnectedMessagesTable } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/ConnectedMessagesTable";
import {
Expand All @@ -9,38 +9,8 @@ import {
} from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/parseSearchParams";
import { KafkaTopicParams } from "@/app/[locale]/kafka/[kafkaId]/topics/kafkaTopic.params";
import { redirect } from "@/navigation";
import { Suspense } from "react";

export default function MessagesPage({
params: { kafkaId, topicId },
searchParams,
}: {
params: KafkaTopicParams;
searchParams: MessagesSearchParams;
}) {
const { partition, offset, timestamp, epoch, limit } =
parseSearchParams(searchParams);
return (
<Suspense
fallback={
<MessagesTableSkeleton
limit={limit}
partition={partition}
filterTimestamp={timestamp}
filterEpoch={epoch}
filterOffset={offset}
/>
}
>
<ConnectedMessagesPage
params={{ kafkaId, topicId }}
searchParams={searchParams}
/>
</Suspense>
);
}

async function ConnectedMessagesPage({
export default async function ConnectedMessagesPage({
params: { kafkaId, topicId },
searchParams,
}: {
Expand All @@ -63,12 +33,17 @@ async function ConnectedMessagesPage({
epoch,
} = parseSearchParams(searchParams);

const { messages, ts } = await getTopicMessages(kafkaId, topicId, {
pageSize: limit,
partition,
filter,
maxValueLength: 150,
});
async function refresh(): Promise<{ messages: Message[]; ts: Date }> {
"use server";
const { messages, ts } = await getTopicMessages(kafkaId, topicId, {
pageSize: limit,
partition,
filter,
maxValueLength: 150,
});
return { messages, ts };
}

const selectedMessage =
selectedOffset !== undefined && selectedPartition !== undefined
? await getTopicMessage(kafkaId, topicId, {
Expand All @@ -79,6 +54,8 @@ async function ConnectedMessagesPage({

const isFiltered = partition || epoch || offset || timestamp;

const { messages, ts } = await refresh();

switch (true) {
case !isFiltered && (messages === null || messages.length === 0):
return <NoDataEmptyState />;
Expand All @@ -97,6 +74,7 @@ async function ConnectedMessagesPage({
"filter[epoch]": epoch,
"filter[offset]": offset,
}}
refresh={refresh}
/>
);
}
Expand Down
3 changes: 1 addition & 2 deletions ui/components/RefreshSelector.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ export function RefreshSelector({
ref={toggleRef}
onClick={toggleOpen}
isExpanded={isOpen}
isDisabled={isRefreshing}
variant={"default"}
splitButtonOptions={{
variant: "action",
Expand Down Expand Up @@ -98,7 +97,7 @@ export function RefreshSelector({
key={"refresh"}
onClick={onRefresh}
isRefreshing={isRefreshing}
isDisabled={isRefreshing}
isDisabled={isRefreshing || isLive}
/>
</FlexItem>
</Flex>
Expand Down
Loading

0 comments on commit fc0b459

Please sign in to comment.