From b460a57145dd3d812b4c58e622fbb0f430894211 Mon Sep 17 00:00:00 2001 From: Wes Date: Thu, 2 Jan 2025 10:37:44 -0700 Subject: [PATCH] feat: use infinite query for streaming endpoint --- .../api/timeline/use-module-trace-events.ts | 7 ++-- .../api/timeline/use-request-trace-events.ts | 22 +++++++----- .../console/src/api/timeline/use-timeline.ts | 34 ++++++++++++++----- .../src/features/timeline/Timeline.tsx | 5 ++- 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/frontend/console/src/api/timeline/use-module-trace-events.ts b/frontend/console/src/api/timeline/use-module-trace-events.ts index 03ebf8b6a0..6b691244cd 100644 --- a/frontend/console/src/api/timeline/use-module-trace-events.ts +++ b/frontend/console/src/api/timeline/use-module-trace-events.ts @@ -1,4 +1,4 @@ -import { EventType } from '../../protos/xyz/block/ftl/timeline/v1/event_pb.ts' +import { type Event, EventType } from '../../protos/xyz/block/ftl/timeline/v1/event_pb.ts' import type { GetTimelineRequest_Filter } from '../../protos/xyz/block/ftl/timeline/v1/timeline_pb.ts' import { eventTypesFilter, moduleFilter } from './timeline-filters.ts' import { useTimeline } from './use-timeline.ts' @@ -8,7 +8,10 @@ export const useModuleTraceEvents = (module: string, verb?: string, filters: Get const allFilters = [...filters, moduleFilter(module, verb), eventTypesFilter(eventTypes)] const timelineQuery = useTimeline(true, allFilters, 500) - const data = timelineQuery.data?.filter((event) => event.entry.case === 'call' || event.entry.case === 'ingress') ?? [] + const data = (timelineQuery.data?.pages ?? []) + .flatMap((page): Event[] => (Array.isArray(page) ? page : [])) + .filter((event) => 'entry' in event && (event.entry.case === 'call' || event.entry.case === 'ingress')) + return { ...timelineQuery, data, diff --git a/frontend/console/src/api/timeline/use-request-trace-events.ts b/frontend/console/src/api/timeline/use-request-trace-events.ts index aca16d82a9..2c846bca7b 100644 --- a/frontend/console/src/api/timeline/use-request-trace-events.ts +++ b/frontend/console/src/api/timeline/use-request-trace-events.ts @@ -1,6 +1,7 @@ import { type AsyncExecuteEvent, type CallEvent, + type Event, EventType, type IngressEvent, type PubSubConsumeEvent, @@ -18,15 +19,20 @@ export const useRequestTraceEvents = (requestKey?: string, filters: GetTimelineR const allFilters = [...filters, requestKeysFilter([requestKey || '']), eventTypesFilter(eventTypes)] const timelineQuery = useTimeline(true, allFilters, 500, !!requestKey) - const data = - timelineQuery.data?.filter( + const data = (timelineQuery.data?.pages ?? []) + .flatMap((page): Event[] => (Array.isArray(page) ? page : [])) + .filter( (event) => - event.entry.case === 'call' || - event.entry.case === 'ingress' || - event.entry.case === 'asyncExecute' || - event.entry.case === 'pubsubPublish' || - event.entry.case === 'pubsubConsume', - ) ?? [] + 'entry' in event && + event.entry && + typeof event.entry === 'object' && + 'case' in event.entry && + (event.entry.case === 'call' || + event.entry.case === 'ingress' || + event.entry.case === 'asyncExecute' || + event.entry.case === 'pubsubPublish' || + event.entry.case === 'pubsubConsume'), + ) return { ...timelineQuery, diff --git a/frontend/console/src/api/timeline/use-timeline.ts b/frontend/console/src/api/timeline/use-timeline.ts index 5487d648d4..333861ebcb 100644 --- a/frontend/console/src/api/timeline/use-timeline.ts +++ b/frontend/console/src/api/timeline/use-timeline.ts @@ -1,11 +1,10 @@ import { Code, ConnectError } from '@connectrpc/connect' -import { useQuery, useQueryClient } from '@tanstack/react-query' +import { type InfiniteData, useInfiniteQuery, useQueryClient } from '@tanstack/react-query' import { useClient } from '../../hooks/use-client' import { useVisibility } from '../../hooks/use-visibility' import { ConsoleService } from '../../protos/xyz/block/ftl/console/v1/console_connect' import type { Event } from '../../protos/xyz/block/ftl/timeline/v1/event_pb' import { type GetTimelineRequest_Filter, GetTimelineRequest_Order } from '../../protos/xyz/block/ftl/timeline/v1/timeline_pb' -import { compareTimestamps } from '../../utils/date.utils' const timelineKey = 'timeline' const maxTimelineEntries = 1000 @@ -40,8 +39,8 @@ export const useTimeline = (isStreaming: boolean, filters: GetTimelineRequest_Fi console.debug('streaming timeline') console.debug('timeline-filters:', filters) - // Clear the cache when starting a new stream - queryClient.setQueryData(queryKey, (_ = []) => []) + // Initialize with empty pages instead of clearing cache + queryClient.setQueryData(queryKey, { pages: [], pageParams: [] }) for await (const response of client.streamTimeline( { updateInterval: { seconds: BigInt(0), nanos: updateIntervalMs * 1000 }, query: { limit, filters, order } }, @@ -49,8 +48,25 @@ export const useTimeline = (isStreaming: boolean, filters: GetTimelineRequest_Fi )) { console.debug('timeline-response:', response) if (response.events) { - queryClient.setQueryData(queryKey, (prev = []) => { - return [...response.events, ...prev].sort((a, b) => compareTimestamps(b.timestamp, a.timestamp)).slice(0, maxTimelineEntries) + queryClient.setQueryData>(queryKey, (old = { pages: [], pageParams: [] }) => { + const newEvents = response.events + const existingEvents = old.pages[0] || [] + const uniqueNewEvents = newEvents.filter((newEvent) => !existingEvents.some((existingEvent) => existingEvent.id === newEvent.id)) + + // Combine and sort all events by timestamp + const allEvents = [...uniqueNewEvents, ...existingEvents] + .sort((a, b) => { + const aTime = a.timestamp + const bTime = b.timestamp + if (!aTime || !bTime) return 0 + return Number(bTime.seconds - aTime.seconds) || Number(bTime.nanos - aTime.nanos) + }) + .slice(0, maxTimelineEntries) + + return { + pages: [allEvents, ...old.pages.slice(1)], + pageParams: old.pageParams, + } }) } } @@ -65,9 +81,11 @@ export const useTimeline = (isStreaming: boolean, filters: GetTimelineRequest_Fi } } - return useQuery({ + return useInfiniteQuery({ queryKey: queryKey, - queryFn: async ({ signal }) => (isStreaming ? streamTimeline({ signal }) : fetchTimeline({ signal })), + queryFn: async ({ signal }) => (isStreaming ? streamTimeline({ signal }) : { pages: [await fetchTimeline({ signal })], pageParams: [] }), enabled: enabled && isVisible, + getNextPageParam: () => null, // Disable pagination for streaming + initialPageParam: null, // Disable pagination for streaming }) } diff --git a/frontend/console/src/features/timeline/Timeline.tsx b/frontend/console/src/features/timeline/Timeline.tsx index 8d76ff3fb0..a2af9452f6 100644 --- a/frontend/console/src/features/timeline/Timeline.tsx +++ b/frontend/console/src/features/timeline/Timeline.tsx @@ -1,4 +1,4 @@ -import { useContext, useEffect, useState } from 'react' +import { useContext, useEffect, useMemo, useState } from 'react' import { useSearchParams } from 'react-router-dom' import { timeFilter, useTimeline } from '../../api/timeline/index.ts' import { Loader } from '../../components/Loader.tsx' @@ -31,6 +31,7 @@ export const Timeline = ({ timeSettings, filters }: { timeSettings: TimeSettings const streamTimeline = timeSettings.isTailing && !timeSettings.isPaused const timeline = useTimeline(streamTimeline, eventFilters) + const entries = useMemo(() => (timeline.data?.pages ?? []).flatMap((page): Event[] => (Array.isArray(page) ? page : [])), [timeline.data?.pages]) useEffect(() => { if (!isOpen) { @@ -94,8 +95,6 @@ export const Timeline = ({ timeSettings, filters }: { timeSettings: TimeSettings ) } - const entries = timeline.data || [] - return (