Skip to content

Commit

Permalink
feat: use infinite query for streaming endpoint (#3885)
Browse files Browse the repository at this point in the history
This integrates [ tanstack infinite query
](https://tanstack.com/query/latest/docs/framework/react/guides/infinite-queries)
to avoid reloading the entire timeline when new events arrive.
  • Loading branch information
wesbillman authored Jan 2, 2025
1 parent b1dbc42 commit 17e177c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 21 deletions.
7 changes: 5 additions & 2 deletions frontend/console/src/api/timeline/use-module-trace-events.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventType } from '../../protos/xyz/block/ftl/timeline/v1/event_pb.ts'
import { type Event, EventType } from '../../protos/xyz/block/ftl/timeline/v1/event_pb.ts'
import type { GetTimelineRequest_Filter } from '../../protos/xyz/block/ftl/timeline/v1/timeline_pb.ts'
import { eventTypesFilter, moduleFilter } from './timeline-filters.ts'
import { useTimeline } from './use-timeline.ts'
Expand All @@ -8,7 +8,10 @@ export const useModuleTraceEvents = (module: string, verb?: string, filters: Get
const allFilters = [...filters, moduleFilter(module, verb), eventTypesFilter(eventTypes)]
const timelineQuery = useTimeline(true, allFilters, 500)

const data = timelineQuery.data?.filter((event) => event.entry.case === 'call' || event.entry.case === 'ingress') ?? []
const data = (timelineQuery.data?.pages ?? [])
.flatMap((page): Event[] => (Array.isArray(page) ? page : []))
.filter((event) => 'entry' in event && (event.entry.case === 'call' || event.entry.case === 'ingress'))

return {
...timelineQuery,
data,
Expand Down
22 changes: 14 additions & 8 deletions frontend/console/src/api/timeline/use-request-trace-events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
type AsyncExecuteEvent,
type CallEvent,
type Event,
EventType,
type IngressEvent,
type PubSubConsumeEvent,
Expand All @@ -18,15 +19,20 @@ export const useRequestTraceEvents = (requestKey?: string, filters: GetTimelineR
const allFilters = [...filters, requestKeysFilter([requestKey || '']), eventTypesFilter(eventTypes)]
const timelineQuery = useTimeline(true, allFilters, 500, !!requestKey)

const data =
timelineQuery.data?.filter(
const data = (timelineQuery.data?.pages ?? [])
.flatMap((page): Event[] => (Array.isArray(page) ? page : []))
.filter(
(event) =>
event.entry.case === 'call' ||
event.entry.case === 'ingress' ||
event.entry.case === 'asyncExecute' ||
event.entry.case === 'pubsubPublish' ||
event.entry.case === 'pubsubConsume',
) ?? []
'entry' in event &&
event.entry &&
typeof event.entry === 'object' &&
'case' in event.entry &&
(event.entry.case === 'call' ||
event.entry.case === 'ingress' ||
event.entry.case === 'asyncExecute' ||
event.entry.case === 'pubsubPublish' ||
event.entry.case === 'pubsubConsume'),
)

return {
...timelineQuery,
Expand Down
34 changes: 26 additions & 8 deletions frontend/console/src/api/timeline/use-timeline.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Code, ConnectError } from '@connectrpc/connect'
import { useQuery, useQueryClient } from '@tanstack/react-query'
import { type InfiniteData, useInfiniteQuery, useQueryClient } from '@tanstack/react-query'
import { useClient } from '../../hooks/use-client'
import { useVisibility } from '../../hooks/use-visibility'
import { ConsoleService } from '../../protos/xyz/block/ftl/console/v1/console_connect'
import type { Event } from '../../protos/xyz/block/ftl/timeline/v1/event_pb'
import { type GetTimelineRequest_Filter, GetTimelineRequest_Order } from '../../protos/xyz/block/ftl/timeline/v1/timeline_pb'
import { compareTimestamps } from '../../utils/date.utils'

const timelineKey = 'timeline'
const maxTimelineEntries = 1000
Expand Down Expand Up @@ -40,17 +39,34 @@ export const useTimeline = (isStreaming: boolean, filters: GetTimelineRequest_Fi
console.debug('streaming timeline')
console.debug('timeline-filters:', filters)

// Clear the cache when starting a new stream
queryClient.setQueryData<Event[]>(queryKey, (_ = []) => [])
// Initialize with empty pages instead of clearing cache
queryClient.setQueryData(queryKey, { pages: [], pageParams: [] })

for await (const response of client.streamTimeline(
{ updateInterval: { seconds: BigInt(0), nanos: updateIntervalMs * 1000 }, query: { limit, filters, order } },
{ signal },
)) {
console.debug('timeline-response:', response)
if (response.events) {
queryClient.setQueryData<Event[]>(queryKey, (prev = []) => {
return [...response.events, ...prev].sort((a, b) => compareTimestamps(b.timestamp, a.timestamp)).slice(0, maxTimelineEntries)
queryClient.setQueryData<InfiniteData<Event[]>>(queryKey, (old = { pages: [], pageParams: [] }) => {
const newEvents = response.events
const existingEvents = old.pages[0] || []
const uniqueNewEvents = newEvents.filter((newEvent) => !existingEvents.some((existingEvent) => existingEvent.id === newEvent.id))

// Combine and sort all events by timestamp
const allEvents = [...uniqueNewEvents, ...existingEvents]
.sort((a, b) => {
const aTime = a.timestamp
const bTime = b.timestamp
if (!aTime || !bTime) return 0
return Number(bTime.seconds - aTime.seconds) || Number(bTime.nanos - aTime.nanos)
})
.slice(0, maxTimelineEntries)

return {
pages: [allEvents, ...old.pages.slice(1)],
pageParams: old.pageParams,
}
})
}
}
Expand All @@ -65,9 +81,11 @@ export const useTimeline = (isStreaming: boolean, filters: GetTimelineRequest_Fi
}
}

return useQuery({
return useInfiniteQuery({
queryKey: queryKey,
queryFn: async ({ signal }) => (isStreaming ? streamTimeline({ signal }) : fetchTimeline({ signal })),
queryFn: async ({ signal }) => (isStreaming ? streamTimeline({ signal }) : { pages: [await fetchTimeline({ signal })], pageParams: [] }),
enabled: enabled && isVisible,
getNextPageParam: () => null, // Disable pagination for streaming
initialPageParam: null, // Disable pagination for streaming
})
}
5 changes: 2 additions & 3 deletions frontend/console/src/features/timeline/Timeline.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { useContext, useEffect, useState } from 'react'
import { useContext, useEffect, useMemo, useState } from 'react'
import { useSearchParams } from 'react-router-dom'
import { timeFilter, useTimeline } from '../../api/timeline/index.ts'
import { Loader } from '../../components/Loader.tsx'
Expand Down Expand Up @@ -31,6 +31,7 @@ export const Timeline = ({ timeSettings, filters }: { timeSettings: TimeSettings
const streamTimeline = timeSettings.isTailing && !timeSettings.isPaused

const timeline = useTimeline(streamTimeline, eventFilters)
const entries = useMemo(() => (timeline.data?.pages ?? []).flatMap((page): Event[] => (Array.isArray(page) ? page : [])), [timeline.data?.pages])

useEffect(() => {
if (!isOpen) {
Expand Down Expand Up @@ -94,8 +95,6 @@ export const Timeline = ({ timeSettings, filters }: { timeSettings: TimeSettings
)
}

const entries = timeline.data || []

return (
<div className='border border-gray-100 dark:border-slate-700 rounded m-2'>
<TimelineEventList events={entries} selectedEventId={selectedEntry?.id} handleEntryClicked={handleEntryClicked} />
Expand Down

0 comments on commit 17e177c

Please sign in to comment.