diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index f707a04347..22ac244aa9 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -276,7 +276,7 @@ func getSlotInfo(ctx context.Context, conn *pgx.Conn, slotName string, database rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s, confirmed_flush_lsn::text,active, round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END - - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind + - restart_lsn) / 1024 / 1024) AS MB_Behind FROM pg_control_checkpoint(),pg_replication_slots %s`, walStatusSelector, whereClause)) if err != nil { return nil, fmt.Errorf("failed to read information for slots: %w", err) diff --git a/ui/app/api/peers/slots/[name]/route.ts b/ui/app/api/peers/slots/[name]/route.ts index 0927193550..13f6ac97de 100644 --- a/ui/app/api/peers/slots/[name]/route.ts +++ b/ui/app/api/peers/slots/[name]/route.ts @@ -6,6 +6,30 @@ export async function GET( request: NextRequest, context: { params: { name: string } } ) { + const timeSince = request.nextUrl.searchParams.get('timeSince'); + + let forThePastThisMuchTime: number; + switch (timeSince) { + case 'day': + forThePastThisMuchTime = 86400000; + break; + case 'month': + forThePastThisMuchTime = 2592000000; + break; + case '15min': + forThePastThisMuchTime = 900000; + break; + case '5min': + forThePastThisMuchTime = 300000; + break; + case '1min': + forThePastThisMuchTime = 60000; + break; + default: + forThePastThisMuchTime = 3600000; + break; + } + const lagPoints = await prisma.peer_slot_size.findMany({ select: { updated_at: true, @@ -14,6 +38,9 @@ export async function GET( }, where: { slot_name: context.params.name, + updated_at: { + gte: new Date(Date.now() - forThePastThisMuchTime), + }, }, }); diff --git a/ui/app/peers/[peerName]/lagGraph.tsx b/ui/app/peers/[peerName]/lagGraph.tsx index 6763761ad8..31dbef3062 100644 --- a/ui/app/peers/[peerName]/lagGraph.tsx +++ b/ui/app/peers/[peerName]/lagGraph.tsx @@ -1,6 +1,5 @@ 'use client'; import { SlotLagPoint } from '@/app/dto/PeersDTO'; -import aggregateCountsByInterval from '@/app/mirrors/[mirrorId]/aggregatedCountsByInterval'; import { formatGraphLabel, timeOptions } from '@/app/utils/graph'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle/ProgressCircle'; @@ -13,17 +12,20 @@ function LagGraph({ slotNames }: { slotNames: string[] }) { const [lagPoints, setLagPoints] = useState([]); const [defaultSlot, setDefaultSlot] = useLocalStorage('defaultSlot', ''); const [selectedSlot, setSelectedSlot] = useState(defaultSlot); - let [aggregateType, setAggregateType] = useState('hour'); + let [timeSince, setTimeSince] = useState('hour'); const fetchLagPoints = useCallback(async () => { if (selectedSlot == '') { return; } - const pointsRes = await fetch(`/api/peers/slots/${selectedSlot}`, { - cache: 'no-store', - }); + const pointsRes = await fetch( + `/api/peers/slots/${selectedSlot}?timeSince=${timeSince}`, + { + cache: 'no-store', + } + ); const points: SlotLagPoint[] = await pointsRes.json(); setLagPoints(points); - }, [selectedSlot]); + }, [selectedSlot, timeSince]); const handleChange = (val: string) => { setDefaultSlot(val); @@ -31,20 +33,15 @@ function LagGraph({ slotNames }: { slotNames: string[] }) { }; const graphValues = useMemo(() => { - let lagDataDot = aggregateCountsByInterval( - lagPoints.map((point) => ({ - timestamp: point.updatedAt, - count: parseInt(point.slotSize || '0', 10) || 0, - })), - aggregateType - ); - lagDataDot = lagDataDot.slice(0, 29); - lagDataDot = lagDataDot.reverse(); + let lagDataDot = lagPoints.map((point) => [ + point.updatedAt, + point.slotSize, + ]); return lagDataDot.map((data) => ({ - time: formatGraphLabel(new Date(data[0]), aggregateType), + time: formatGraphLabel(new Date(data[0]!), timeSince), 'Lag in MB': data[1], })); - }, [lagPoints, aggregateType]); + }, [lagPoints, timeSince]); const [mounted, setMounted] = useState(false); useEffect(() => { @@ -81,20 +78,28 @@ function LagGraph({ slotNames }: { slotNames: string[] }) { ({ - label: slotName, - value: slotName, - }))} + options={ + slotNames.length === 0 + ? undefined + : slotNames.map((slotName) => ({ + label: slotName, + value: slotName, + })) + } onChange={(val, _) => val && handleChange(val.value)} - defaultValue={{ value: selectedSlot, label: selectedSlot }} + defaultValue={ + selectedSlot + ? { value: selectedSlot, label: selectedSlot } + : undefined + } /> val && setAggregateType(val.value)} + onChange={(val, _) => val && setTimeSince(val.value)} />