Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UI: Fix slot graph #1364

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func getSlotInfo(ctx context.Context, conn *pgx.Conn, slotName string, database
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
confirmed_flush_lsn::text,active,
round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END
- confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind
- restart_lsn) / 1024 / 1024) AS MB_Behind
FROM pg_control_checkpoint(),pg_replication_slots %s`, walStatusSelector, whereClause))
if err != nil {
return nil, fmt.Errorf("failed to read information for slots: %w", err)
Expand Down
27 changes: 27 additions & 0 deletions ui/app/api/peers/slots/[name]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@ export async function GET(
request: NextRequest,
context: { params: { name: string } }
) {
const timeSince = request.nextUrl.searchParams.get('timeSince');

let forThePastThisMuchTime: number;
switch (timeSince) {
case 'day':
forThePastThisMuchTime = 86400000;
break;
case 'month':
forThePastThisMuchTime = 2592000000;
break;
case '15min':
forThePastThisMuchTime = 900000;
break;
case '5min':
forThePastThisMuchTime = 300000;
break;
case '1min':
forThePastThisMuchTime = 60000;
break;
default:
forThePastThisMuchTime = 3600000;
break;
}

const lagPoints = await prisma.peer_slot_size.findMany({
select: {
updated_at: true,
Expand All @@ -14,6 +38,9 @@ export async function GET(
},
where: {
slot_name: context.params.name,
updated_at: {
gte: new Date(Date.now() - forThePastThisMuchTime),
},
},
});

Expand Down
53 changes: 29 additions & 24 deletions ui/app/peers/[peerName]/lagGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use client';
import { SlotLagPoint } from '@/app/dto/PeersDTO';
import aggregateCountsByInterval from '@/app/mirrors/[mirrorId]/aggregatedCountsByInterval';
import { formatGraphLabel, timeOptions } from '@/app/utils/graph';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle/ProgressCircle';
Expand All @@ -13,38 +12,36 @@ function LagGraph({ slotNames }: { slotNames: string[] }) {
const [lagPoints, setLagPoints] = useState<SlotLagPoint[]>([]);
const [defaultSlot, setDefaultSlot] = useLocalStorage('defaultSlot', '');
const [selectedSlot, setSelectedSlot] = useState<string>(defaultSlot);
let [aggregateType, setAggregateType] = useState('hour');
let [timeSince, setTimeSince] = useState('hour');
const fetchLagPoints = useCallback(async () => {
if (selectedSlot == '') {
return;
}
const pointsRes = await fetch(`/api/peers/slots/${selectedSlot}`, {
cache: 'no-store',
});
const pointsRes = await fetch(
`/api/peers/slots/${selectedSlot}?timeSince=${timeSince}`,
{
cache: 'no-store',
}
);
const points: SlotLagPoint[] = await pointsRes.json();
setLagPoints(points);
}, [selectedSlot]);
}, [selectedSlot, timeSince]);

const handleChange = (val: string) => {
setDefaultSlot(val);
setSelectedSlot(val);
};

const graphValues = useMemo(() => {
let lagDataDot = aggregateCountsByInterval(
lagPoints.map((point) => ({
timestamp: point.updatedAt,
count: parseInt(point.slotSize || '0', 10) || 0,
})),
aggregateType
);
lagDataDot = lagDataDot.slice(0, 29);
lagDataDot = lagDataDot.reverse();
let lagDataDot = lagPoints.map((point) => [
point.updatedAt,
point.slotSize,
]);
return lagDataDot.map((data) => ({
time: formatGraphLabel(new Date(data[0]), aggregateType),
time: formatGraphLabel(new Date(data[0]!), timeSince),
'Lag in MB': data[1],
}));
}, [lagPoints, aggregateType]);
}, [lagPoints, timeSince]);

const [mounted, setMounted] = useState(false);
useEffect(() => {
Expand Down Expand Up @@ -81,20 +78,28 @@ function LagGraph({ slotNames }: { slotNames: string[] }) {
<ReactSelect
className='w-1/4'
placeholder='Select a replication slot'
options={slotNames.map((slotName) => ({
label: slotName,
value: slotName,
}))}
options={
slotNames.length === 0
? undefined
: slotNames.map((slotName) => ({
label: slotName,
value: slotName,
}))
}
onChange={(val, _) => val && handleChange(val.value)}
defaultValue={{ value: selectedSlot, label: selectedSlot }}
defaultValue={
selectedSlot
? { value: selectedSlot, label: selectedSlot }
: undefined
}
/>

<ReactSelect
id={aggregateType}
id={timeSince}
placeholder='Select a timeframe'
options={timeOptions}
defaultValue={{ label: 'hour', value: 'hour' }}
onChange={(val, _) => val && setAggregateType(val.value)}
onChange={(val, _) => val && setTimeSince(val.value)}
/>
</div>
<LineChart
Expand Down
Loading