Skip to content

Commit

Permalink
UI: Fix slot graph (#1364)
Browse files Browse the repository at this point in the history
- Fix lag point computation
- Use `pg_current_wal_lsn() - restart_lsn` instead of
confirmed_flush_lsn
- Fix placeholder not showing in slot dropdown

![Screenshot 2024-02-23 at 1 11
05 PM](https://github.com/PeerDB-io/peerdb/assets/65964360/33e3d478-f30f-4a22-82f6-88d0f86b08a7)
  • Loading branch information
Amogh-Bharadwaj authored Feb 26, 2024
1 parent b75bc46 commit 98e6d01
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 25 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func getSlotInfo(ctx context.Context, conn *pgx.Conn, slotName string, database
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
confirmed_flush_lsn::text,active,
round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END
- confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind
- restart_lsn) / 1024 / 1024) AS MB_Behind
FROM pg_control_checkpoint(),pg_replication_slots %s`, walStatusSelector, whereClause))
if err != nil {
return nil, fmt.Errorf("failed to read information for slots: %w", err)
Expand Down
27 changes: 27 additions & 0 deletions ui/app/api/peers/slots/[name]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@ export async function GET(
request: NextRequest,
context: { params: { name: string } }
) {
const timeSince = request.nextUrl.searchParams.get('timeSince');

let forThePastThisMuchTime: number;
switch (timeSince) {
case 'day':
forThePastThisMuchTime = 86400000;
break;
case 'month':
forThePastThisMuchTime = 2592000000;
break;
case '15min':
forThePastThisMuchTime = 900000;
break;
case '5min':
forThePastThisMuchTime = 300000;
break;
case '1min':
forThePastThisMuchTime = 60000;
break;
default:
forThePastThisMuchTime = 3600000;
break;
}

const lagPoints = await prisma.peer_slot_size.findMany({
select: {
updated_at: true,
Expand All @@ -14,6 +38,9 @@ export async function GET(
},
where: {
slot_name: context.params.name,
updated_at: {
gte: new Date(Date.now() - forThePastThisMuchTime),
},
},
});

Expand Down
53 changes: 29 additions & 24 deletions ui/app/peers/[peerName]/lagGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use client';
import { SlotLagPoint } from '@/app/dto/PeersDTO';
import aggregateCountsByInterval from '@/app/mirrors/[mirrorId]/aggregatedCountsByInterval';
import { formatGraphLabel, timeOptions } from '@/app/utils/graph';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle/ProgressCircle';
Expand All @@ -13,38 +12,36 @@ function LagGraph({ slotNames }: { slotNames: string[] }) {
const [lagPoints, setLagPoints] = useState<SlotLagPoint[]>([]);
const [defaultSlot, setDefaultSlot] = useLocalStorage('defaultSlot', '');
const [selectedSlot, setSelectedSlot] = useState<string>(defaultSlot);
let [aggregateType, setAggregateType] = useState('hour');
let [timeSince, setTimeSince] = useState('hour');
const fetchLagPoints = useCallback(async () => {
if (selectedSlot == '') {
return;
}
const pointsRes = await fetch(`/api/peers/slots/${selectedSlot}`, {
cache: 'no-store',
});
const pointsRes = await fetch(
`/api/peers/slots/${selectedSlot}?timeSince=${timeSince}`,
{
cache: 'no-store',
}
);
const points: SlotLagPoint[] = await pointsRes.json();
setLagPoints(points);
}, [selectedSlot]);
}, [selectedSlot, timeSince]);

const handleChange = (val: string) => {
setDefaultSlot(val);
setSelectedSlot(val);
};

const graphValues = useMemo(() => {
let lagDataDot = aggregateCountsByInterval(
lagPoints.map((point) => ({
timestamp: point.updatedAt,
count: parseInt(point.slotSize || '0', 10) || 0,
})),
aggregateType
);
lagDataDot = lagDataDot.slice(0, 29);
lagDataDot = lagDataDot.reverse();
let lagDataDot = lagPoints.map((point) => [
point.updatedAt,
point.slotSize,
]);
return lagDataDot.map((data) => ({
time: formatGraphLabel(new Date(data[0]), aggregateType),
time: formatGraphLabel(new Date(data[0]!), timeSince),
'Lag in MB': data[1],
}));
}, [lagPoints, aggregateType]);
}, [lagPoints, timeSince]);

const [mounted, setMounted] = useState(false);
useEffect(() => {
Expand Down Expand Up @@ -81,20 +78,28 @@ function LagGraph({ slotNames }: { slotNames: string[] }) {
<ReactSelect
className='w-1/4'
placeholder='Select a replication slot'
options={slotNames.map((slotName) => ({
label: slotName,
value: slotName,
}))}
options={
slotNames.length === 0
? undefined
: slotNames.map((slotName) => ({
label: slotName,
value: slotName,
}))
}
onChange={(val, _) => val && handleChange(val.value)}
defaultValue={{ value: selectedSlot, label: selectedSlot }}
defaultValue={
selectedSlot
? { value: selectedSlot, label: selectedSlot }
: undefined
}
/>

<ReactSelect
id={aggregateType}
id={timeSince}
placeholder='Select a timeframe'
options={timeOptions}
defaultValue={{ label: 'hour', value: 'hour' }}
onChange={(val, _) => val && setAggregateType(val.value)}
onChange={(val, _) => val && setTimeSince(val.value)}
/>
</div>
<LineChart
Expand Down

0 comments on commit 98e6d01

Please sign in to comment.