Skip to content

Commit

Permalink
UI: more accurate metrics, cdc params (#1086)
Browse files Browse the repository at this point in the history
Replaces `start_time` with `end_time` in a couple of places.
Fixes rows synced number in sync status tab
Rename idle timeout in create cdc page to sync interval
Correct default value of pull batch size to 1 mil
Simplify UI of 'Active' display
  • Loading branch information
Amogh-Bharadwaj authored Jan 17, 2024
1 parent 53bec30 commit 8bb0bf1
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 45 deletions.
8 changes: 4 additions & 4 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ export const cdcSettings: MirrorSetting[] = [
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
maxBatchSize: (value as number) || 100000,
maxBatchSize: (value as number) || 1000000,
})),
tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 100,000 rows.',
tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 1,000,000 rows.',
type: 'number',
default: '100000',
default: '1000000',
advanced: true,
},
{
label: 'Idle Timeout (Seconds)',
label: 'Sync Interval (Seconds)',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const blankCDCSetting: FlowConnectionConfigs = {
srcTableIdNameMapping: {},
tableNameSchemaMapping: {},
metadataPeer: undefined,
maxBatchSize: 100000,
maxBatchSize: 1000000,
doInitialCopy: true,
publicationName: '',
snapshotNumRowsPerPartition: 500000,
Expand Down
7 changes: 5 additions & 2 deletions ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import moment from 'moment';

type timestampType = {
timestamp: Date;
timestamp: Date | null;
count: number;
};

Expand Down Expand Up @@ -45,7 +45,7 @@ function aggregateCountsByInterval(
N = 15;
}

const currTs = new Date(timestamp);
const currTs = new Date(timestamp ?? 0);
const date = roundUpToNearestNMinutes(currTs, N);
const formattedTimestamp = moment(date).format(timeUnit);

Expand All @@ -67,6 +67,9 @@ function aggregateCountsByInterval(
if (interval === '5min') {
currentTimestamp = roundUpToNearestNMinutes(currentTimestamp, 5);
}
if (interval === '1min') {
currentTimestamp = roundUpToNearestNMinutes(currentTimestamp, 1);
}

while (intervals.length < 30) {
intervals.push(moment(currentTimestamp).format(timeUnit));
Expand Down
22 changes: 17 additions & 5 deletions ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import PeerButton from '@/components/PeerComponent';
import TimeLabel from '@/components/TimeComponent';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { dBTypeFromJSON } from '@/grpc_generated/peers';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import moment from 'moment';
import Link from 'next/link';
Expand All @@ -18,8 +17,13 @@ type props = {
createdAt?: Date;
};
function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
let lastSyncedAt = moment(syncs[0]?.startTime).fromNow();
let rowsSynced = syncs.reduce((acc, sync) => acc + sync.numRows, 0);
let lastSyncedAt = moment(syncs[0]?.endTime).fromNow();
let rowsSynced = syncs.reduce((acc, sync) => {
if (sync.endTime !== null) {
return acc + sync.numRows;
}
return acc;
}, 0);

const tablesSynced = mirrorConfig?.tableMappings;
return (
Expand All @@ -32,9 +36,17 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
Status
</Label>
</div>
<div className='cursor-pointer underline'>
<div
style={{
padding: '0.2rem',
width: 'fit-content',
borderRadius: '1rem',
border: '1px solid rgba(0,0,0,0.1)',
cursor: 'pointer',
}}
>
<Link href={`/mirrors/errors/${mirrorConfig?.flowJobName}`}>
Active <Icon name='description' />
<Label> Active </Label>
</Link>
</div>
</div>
Expand Down
6 changes: 3 additions & 3 deletions ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use client';
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import { timeOptions } from '@/app/utils/graph';
import { formatGraphLabel, timeOptions } from '@/app/utils/graph';
import { Label } from '@/lib/Label';
import { BarChart } from '@tremor/react';
import { useMemo, useState } from 'react';
Expand All @@ -12,14 +12,14 @@ function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) {

const graphValues = useMemo(() => {
const rows = syncs.map((sync) => ({
timestamp: sync.startTime,
timestamp: sync.endTime,
count: sync.numRows,
}));
let timedRowCounts = aggregateCountsByInterval(rows, aggregateType);
timedRowCounts = timedRowCounts.slice(0, 29);
timedRowCounts = timedRowCounts.reverse();
return timedRowCounts.map((count) => ({
name: count[0],
name: formatGraphLabel(new Date(count[0]), aggregateType),
'Rows synced at a point in time': count[1],
}));
}, [syncs, aggregateType]);
Expand Down
25 changes: 16 additions & 9 deletions ui/app/mirrors/edit/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import prisma from '@/app/utils/prisma';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Header } from '@/lib/Header';
Expand Down Expand Up @@ -51,29 +52,35 @@ export default async function EditMirror({
orderBy: {
start_time: 'desc',
},
distinct: ['batch_id'],
});

const rows: SyncStatusRow[] = syncs.map((sync) => ({
batchId: sync.batch_id,
startTime: sync.start_time,
endTime: sync.end_time,
numRows: sync.rows_in_batch,
}));

if (mirrorStatus.errorMessage) {
return <NoMirror />;
}

let syncStatusChild = <></>;
if (mirrorStatus.cdcStatus) {
let rowsSynced = syncs.reduce((acc, sync) => acc + sync.rows_in_batch, 0);
let rowsSynced = syncs.reduce((acc, sync) => {
if (sync.end_time !== null) {
return acc + sync.rows_in_batch;
}
return acc;
}, 0);
syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} flowJobName={mirrorId} />
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);
} else {
redirect(`/mirrors/status/qrep/${mirrorId}`);
}

const rows = syncs.map((sync) => ({
batchId: sync.batch_id,
startTime: sync.start_time,
endTime: sync.end_time,
numRows: sync.rows_in_batch,
}));

return (
<LayoutMain alignSelf='flex-start' justifySelf='flex-start' width='full'>
<Header variant='title2'>{mirrorId}</Header>
Expand Down
24 changes: 3 additions & 21 deletions ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import prisma from '@/app/utils/prisma';
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import { Label } from '@/lib/Label';
import CdcGraph from './cdcGraph';
import { SyncStatusTable } from './syncStatusTable';
Expand All @@ -9,36 +9,18 @@ function numberWithCommas(x: Number): string {
type SyncStatusProps = {
flowJobName: string | undefined;
rowsSynced: Number;
rows: SyncStatusRow[];
};

export default async function SyncStatus({
flowJobName,
rowsSynced,
rows,
}: SyncStatusProps) {
if (!flowJobName) {
return <div>Flow job name not provided!</div>;
}

const syncs = await prisma.cdc_batches.findMany({
where: {
flow_name: flowJobName,
start_time: {
not: undefined,
},
},
orderBy: {
start_time: 'desc',
},
distinct: ['batch_id'],
});

const rows = syncs.map((sync) => ({
batchId: sync.batch_id,
startTime: sync.start_time,
endTime: sync.end_time,
numRows: sync.rows_in_batch,
}));

return (
<div>
<div className='flex items-center'>
Expand Down

0 comments on commit 8bb0bf1

Please sign in to comment.