From 2d6a68a5a380d84f02a07cdd36908ca4df14f1b6 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 5 Jan 2024 17:10:19 -0500 Subject: [PATCH] make summary even faster (#996) --- flow/cmd/mirror_status.go | 70 ++++++++++- protos/route.proto | 12 +- ui/app/mirrors/edit/[mirrorId]/cdc.tsx | 116 +++++++----------- ui/app/mirrors/edit/[mirrorId]/nomirror.tsx | 28 +++++ ui/app/mirrors/edit/[mirrorId]/page.tsx | 5 + ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx | 1 + .../qrep/[mirrorId]/qrepStatusTable.tsx | 32 ++--- ui/app/mirrors/tables.tsx | 1 - ui/components/TimeComponent.tsx | 13 ++ 9 files changed, 183 insertions(+), 95 deletions(-) create mode 100644 ui/app/mirrors/edit/[mirrorId]/nomirror.tsx diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 19c818b91e..6c29bd39b0 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -85,14 +85,13 @@ func (h *FlowRequestHandler) CDCFlowStatus( return nil, err } - cloneStatuses := []*protos.QRepMirrorStatus{} + cloneStatuses := []*protos.CloneTableSummary{} for _, cloneJobName := range cloneJobNames { - cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{ - FlowJobName: cloneJobName, - }) + cloneStatus, err := h.cloneTableSummary(ctx, cloneJobName) if err != nil { return nil, err } + cloneStatuses = append(cloneStatuses, cloneStatus) } @@ -106,6 +105,69 @@ func (h *FlowRequestHandler) CDCFlowStatus( }, nil } +func (h *FlowRequestHandler) cloneTableSummary( + ctx context.Context, + flowJobName string, +) (*protos.CloneTableSummary, error) { + cfg := h.getQRepConfigFromCatalog(flowJobName) + res := &protos.CloneTableSummary{ + FlowJobName: flowJobName, + TableName: cfg.DestinationTableIdentifier, + } + + q := ` + SELECT + MIN(start_time) AS StartTime, + COUNT(*) AS NumPartitionsTotal, + COUNT(CASE WHEN end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted, + SUM(rows_in_partition) FILTER (WHERE end_time IS NOT NULL) AS NumRowsSynced, + AVG(EXTRACT(EPOCH FROM (end_time - start_time)) * 1000) FILTER (WHERE end_time IS NOT NULL) AS AvgTimePerPartitionMs + FROM + peerdb_stats.qrep_partitions + WHERE + flow_name = $1; + ` + + var startTime pgtype.Timestamp + var numPartitionsTotal pgtype.Int8 + var numPartitionsCompleted pgtype.Int8 + var numRowsSynced pgtype.Int8 + var avgTimePerPartitionMs pgtype.Float8 + + err := h.pool.QueryRow(ctx, q, flowJobName).Scan( + &startTime, + &numPartitionsTotal, + &numPartitionsCompleted, + &numRowsSynced, + &avgTimePerPartitionMs, + ) + if err != nil { + return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err) + } + + if startTime.Valid { + res.StartTime = timestamppb.New(startTime.Time) + } + + if numPartitionsTotal.Valid { + res.NumPartitionsTotal = int32(numPartitionsTotal.Int64) + } + + if numPartitionsCompleted.Valid { + res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64) + } + + if numRowsSynced.Valid { + res.NumRowsSynced = numRowsSynced.Int64 + } + + if avgTimePerPartitionMs.Valid { + res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64) + } + + return res, nil +} + func (h *FlowRequestHandler) QRepFlowStatus( ctx context.Context, req *protos.MirrorStatusRequest, diff --git a/protos/route.proto b/protos/route.proto index 3fead611de..db330104b3 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -162,8 +162,18 @@ message PeerStatResponse { repeated StatInfo stat_data = 1; } +message CloneTableSummary { + string table_name = 1; + google.protobuf.Timestamp start_time = 2; + int32 num_partitions_completed = 3; + int32 num_partitions_total = 4; + int64 num_rows_synced = 5; + int64 avg_time_per_partition_ms = 6; + string flow_job_name = 7; +} + message SnapshotStatus { - repeated QRepMirrorStatus clones = 1; + repeated CloneTableSummary clones = 1; } message CDCMirrorStatus { diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index 52d21c4999..52103ee702 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -3,13 +3,14 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import TimeLabel from '@/components/TimeComponent'; import { CDCMirrorStatus, - QRepMirrorStatus, + CloneTableSummary, SnapshotStatus, } from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { ProgressBar } from '@/lib/ProgressBar'; +import { ProgressCircle } from '@/lib/ProgressCircle'; import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react'; @@ -17,77 +18,40 @@ import moment, { Duration, Moment } from 'moment'; import Link from 'next/link'; import { useEffect, useMemo, useState } from 'react'; import ReactSelect from 'react-select'; +import { useLocalStorage } from 'usehooks-ts'; import CdcDetails from './cdcDetails'; class TableCloneSummary { - flowJobName: string; - tableName: string; - totalNumPartitions: number; - totalNumRows: number; - completedNumPartitions: number; - completedNumRows: number; - avgTimePerPartition: Duration | null; - cloneStartTime: Moment | null; + cloneStartTime: Moment | null = null; + cloneTableSummary: CloneTableSummary; + avgTimePerPartition: Duration | null = null; - constructor(clone: QRepMirrorStatus) { - this.flowJobName = clone.config?.flowJobName || ''; - this.tableName = clone.config?.watermarkTable || ''; - this.totalNumPartitions = 0; - this.totalNumRows = 0; - this.completedNumPartitions = 0; - this.completedNumRows = 0; - this.avgTimePerPartition = null; - this.cloneStartTime = null; - - this.calculate(clone); - } - - private calculate(clone: QRepMirrorStatus): void { - let totalTime = moment.duration(0); - clone.partitions?.forEach((partition) => { - this.totalNumPartitions++; - this.totalNumRows += partition.numRows; - - if (partition.startTime) { - let st = moment(partition.startTime); - if (!this.cloneStartTime || st.isBefore(this.cloneStartTime)) { - this.cloneStartTime = st; - } - } - - if (partition.endTime) { - this.completedNumPartitions++; - this.completedNumRows += partition.numRows; - let st = moment(partition.startTime); - let et = moment(partition.endTime); - let duration = moment.duration(et.diff(st)); - totalTime = totalTime.add(duration); - } - }); - - if (this.completedNumPartitions > 0) { + constructor(clone: CloneTableSummary) { + this.cloneTableSummary = clone; + if (clone.startTime) { + this.cloneStartTime = moment(clone.startTime); + } + if (clone.avgTimePerPartitionMs) { this.avgTimePerPartition = moment.duration( - totalTime.asMilliseconds() / this.completedNumPartitions + clone.avgTimePerPartitionMs, + 'ms' ); } } - getRowProgressPercentage(): number { - if (this.totalNumRows === 0) { - return 0; - } - return (this.completedNumRows / this.totalNumRows) * 100; - } - getPartitionProgressPercentage(): number { - if (this.totalNumPartitions === 0) { + if (this.cloneTableSummary.numPartitionsTotal === 0) { return 0; } - return (this.completedNumPartitions / this.totalNumPartitions) * 100; + return ( + (this.cloneTableSummary.numPartitionsCompleted / + this.cloneTableSummary.numPartitionsTotal) * + 100 + ); } } -function summarizeTableClone(clone: QRepMirrorStatus): TableCloneSummary { +function summarizeTableClone(clone: CloneTableSummary): TableCloneSummary { return new TableCloneSummary(clone); } @@ -106,8 +70,11 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { const [searchQuery, setSearchQuery] = useState(''); const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc'); const displayedRows = useMemo(() => { - const shownRows = allRows.filter((row: any) => - row.tableName.toLowerCase().includes(searchQuery.toLowerCase()) + const shownRows = allRows.filter( + (row: TableCloneSummary) => + row.cloneTableSummary.tableName + ?.toLowerCase() + .includes(searchQuery.toLowerCase()) ); shownRows.sort((a, b) => { const aValue = a[sortField]; @@ -231,10 +198,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { @@ -247,9 +214,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { - {clone.completedNumPartitions} / {clone.totalNumPartitions} + {clone.cloneTableSummary.numPartitionsCompleted} /{' '} + {clone.cloneTableSummary.numPartitionsTotal} - {clone.completedNumRows} + {clone.cloneTableSummary.numRowsSynced}