From 8bb0bf14884488a9b7d1ebf672c817dbcddc1847 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 17 Jan 2024 21:32:17 +0530 Subject: [PATCH] UI: more accurate metrics, cdc params (#1086) Replaces `start_time` with `end_time` in a couple of places. Fixes rows synced number in sync status tab Rename idle timeout in create cdc page to sync interval Correct default value of pull batch size to 1 mil Simplify UI of 'Active' display --- ui/app/mirrors/create/helpers/cdc.ts | 8 +++--- ui/app/mirrors/create/helpers/common.ts | 2 +- .../[mirrorId]/aggregatedCountsByInterval.ts | 7 ++++-- ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx | 22 ++++++++++++---- ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx | 6 ++--- ui/app/mirrors/edit/[mirrorId]/page.tsx | 25 ++++++++++++------- ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx | 24 +++--------------- 7 files changed, 49 insertions(+), 45 deletions(-) diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index 72f4321f23..40ae5405b8 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -17,15 +17,15 @@ export const cdcSettings: MirrorSetting[] = [ stateHandler: (value, setter) => setter((curr: CDCConfig) => ({ ...curr, - maxBatchSize: (value as number) || 100000, + maxBatchSize: (value as number) || 1000000, })), - tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 100,000 rows.', + tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 1,000,000 rows.', type: 'number', - default: '100000', + default: '1000000', advanced: true, }, { - label: 'Idle Timeout (Seconds)', + label: 'Sync Interval (Seconds)', stateHandler: (value, setter) => setter((curr: CDCConfig) => ({ ...curr, diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index cf1579fe05..6864c1bfb2 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -28,7 +28,7 @@ export const blankCDCSetting: FlowConnectionConfigs = { srcTableIdNameMapping: {}, tableNameSchemaMapping: {}, metadataPeer: undefined, - maxBatchSize: 100000, + maxBatchSize: 1000000, doInitialCopy: true, publicationName: '', snapshotNumRowsPerPartition: 500000, diff --git a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts index 8abda13b25..d9aba603b8 100644 --- a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts +++ b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts @@ -1,7 +1,7 @@ import moment from 'moment'; type timestampType = { - timestamp: Date; + timestamp: Date | null; count: number; }; @@ -45,7 +45,7 @@ function aggregateCountsByInterval( N = 15; } - const currTs = new Date(timestamp); + const currTs = new Date(timestamp ?? 0); const date = roundUpToNearestNMinutes(currTs, N); const formattedTimestamp = moment(date).format(timeUnit); @@ -67,6 +67,9 @@ function aggregateCountsByInterval( if (interval === '5min') { currentTimestamp = roundUpToNearestNMinutes(currentTimestamp, 5); } + if (interval === '1min') { + currentTimestamp = roundUpToNearestNMinutes(currentTimestamp, 1); + } while (intervals.length < 30) { intervals.push(moment(currentTimestamp).format(timeUnit)); diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index 03f80de7f4..5a635eee1e 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -5,7 +5,6 @@ import PeerButton from '@/components/PeerComponent'; import TimeLabel from '@/components/TimeComponent'; import { FlowConnectionConfigs } from '@/grpc_generated/flow'; import { dBTypeFromJSON } from '@/grpc_generated/peers'; -import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import moment from 'moment'; import Link from 'next/link'; @@ -18,8 +17,13 @@ type props = { createdAt?: Date; }; function CdcDetails({ syncs, createdAt, mirrorConfig }: props) { - let lastSyncedAt = moment(syncs[0]?.startTime).fromNow(); - let rowsSynced = syncs.reduce((acc, sync) => acc + sync.numRows, 0); + let lastSyncedAt = moment(syncs[0]?.endTime).fromNow(); + let rowsSynced = syncs.reduce((acc, sync) => { + if (sync.endTime !== null) { + return acc + sync.numRows; + } + return acc; + }, 0); const tablesSynced = mirrorConfig?.tableMappings; return ( @@ -32,9 +36,17 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) { Status -
+
- Active +
diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx index 5df84d50d1..94789dfd05 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx @@ -1,6 +1,6 @@ 'use client'; import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; -import { timeOptions } from '@/app/utils/graph'; +import { formatGraphLabel, timeOptions } from '@/app/utils/graph'; import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; import { useMemo, useState } from 'react'; @@ -12,14 +12,14 @@ function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) { const graphValues = useMemo(() => { const rows = syncs.map((sync) => ({ - timestamp: sync.startTime, + timestamp: sync.endTime, count: sync.numRows, })); let timedRowCounts = aggregateCountsByInterval(rows, aggregateType); timedRowCounts = timedRowCounts.slice(0, 29); timedRowCounts = timedRowCounts.reverse(); return timedRowCounts.map((count) => ({ - name: count[0], + name: formatGraphLabel(new Date(count[0]), aggregateType), 'Rows synced at a point in time': count[1], })); }, [syncs, aggregateType]); diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/edit/[mirrorId]/page.tsx index 4f68dcf20c..d472b849e7 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/page.tsx @@ -1,3 +1,4 @@ +import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import prisma from '@/app/utils/prisma'; import { MirrorStatusResponse } from '@/grpc_generated/route'; import { Header } from '@/lib/Header'; @@ -51,29 +52,35 @@ export default async function EditMirror({ orderBy: { start_time: 'desc', }, + distinct: ['batch_id'], }); + const rows: SyncStatusRow[] = syncs.map((sync) => ({ + batchId: sync.batch_id, + startTime: sync.start_time, + endTime: sync.end_time, + numRows: sync.rows_in_batch, + })); + if (mirrorStatus.errorMessage) { return ; } let syncStatusChild = <>; if (mirrorStatus.cdcStatus) { - let rowsSynced = syncs.reduce((acc, sync) => acc + sync.rows_in_batch, 0); + let rowsSynced = syncs.reduce((acc, sync) => { + if (sync.end_time !== null) { + return acc + sync.rows_in_batch; + } + return acc; + }, 0); syncStatusChild = ( - + ); } else { redirect(`/mirrors/status/qrep/${mirrorId}`); } - const rows = syncs.map((sync) => ({ - batchId: sync.batch_id, - startTime: sync.start_time, - endTime: sync.end_time, - numRows: sync.rows_in_batch, - })); - return (
{mirrorId}
diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx index b60f75ce11..ba50ff67b8 100644 --- a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx @@ -1,4 +1,4 @@ -import prisma from '@/app/utils/prisma'; +import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import { Label } from '@/lib/Label'; import CdcGraph from './cdcGraph'; import { SyncStatusTable } from './syncStatusTable'; @@ -9,36 +9,18 @@ function numberWithCommas(x: Number): string { type SyncStatusProps = { flowJobName: string | undefined; rowsSynced: Number; + rows: SyncStatusRow[]; }; export default async function SyncStatus({ flowJobName, rowsSynced, + rows, }: SyncStatusProps) { if (!flowJobName) { return
Flow job name not provided!
; } - const syncs = await prisma.cdc_batches.findMany({ - where: { - flow_name: flowJobName, - start_time: { - not: undefined, - }, - }, - orderBy: { - start_time: 'desc', - }, - distinct: ['batch_id'], - }); - - const rows = syncs.map((sync) => ({ - batchId: sync.batch_id, - startTime: sync.start_time, - endTime: sync.end_time, - numRows: sync.rows_in_batch, - })); - return (