Skip to content

Commit

Permalink
make summary even faster (#996)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jan 5, 2024
1 parent 0900c31 commit 2d6a68a
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 95 deletions.
70 changes: 66 additions & 4 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,13 @@ func (h *FlowRequestHandler) CDCFlowStatus(
return nil, err
}

cloneStatuses := []*protos.QRepMirrorStatus{}
cloneStatuses := []*protos.CloneTableSummary{}
for _, cloneJobName := range cloneJobNames {
cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: cloneJobName,
})
cloneStatus, err := h.cloneTableSummary(ctx, cloneJobName)
if err != nil {
return nil, err
}

cloneStatuses = append(cloneStatuses, cloneStatus)
}

Expand All @@ -106,6 +105,69 @@ func (h *FlowRequestHandler) CDCFlowStatus(
}, nil
}

func (h *FlowRequestHandler) cloneTableSummary(
ctx context.Context,
flowJobName string,
) (*protos.CloneTableSummary, error) {
cfg := h.getQRepConfigFromCatalog(flowJobName)
res := &protos.CloneTableSummary{
FlowJobName: flowJobName,
TableName: cfg.DestinationTableIdentifier,
}

q := `
SELECT
MIN(start_time) AS StartTime,
COUNT(*) AS NumPartitionsTotal,
COUNT(CASE WHEN end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(rows_in_partition) FILTER (WHERE end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (end_time - start_time)) * 1000) FILTER (WHERE end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM
peerdb_stats.qrep_partitions
WHERE
flow_name = $1;
`

var startTime pgtype.Timestamp
var numPartitionsTotal pgtype.Int8
var numPartitionsCompleted pgtype.Int8
var numRowsSynced pgtype.Int8
var avgTimePerPartitionMs pgtype.Float8

err := h.pool.QueryRow(ctx, q, flowJobName).Scan(
&startTime,
&numPartitionsTotal,
&numPartitionsCompleted,
&numRowsSynced,
&avgTimePerPartitionMs,
)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
}

if startTime.Valid {
res.StartTime = timestamppb.New(startTime.Time)
}

if numPartitionsTotal.Valid {
res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
}

if numPartitionsCompleted.Valid {
res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64)
}

if numRowsSynced.Valid {
res.NumRowsSynced = numRowsSynced.Int64
}

if avgTimePerPartitionMs.Valid {
res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64)
}

return res, nil
}

func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
Expand Down
12 changes: 11 additions & 1 deletion protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,18 @@ message PeerStatResponse {
repeated StatInfo stat_data = 1;
}

message CloneTableSummary {
string table_name = 1;
google.protobuf.Timestamp start_time = 2;
int32 num_partitions_completed = 3;
int32 num_partitions_total = 4;
int64 num_rows_synced = 5;
int64 avg_time_per_partition_ms = 6;
string flow_job_name = 7;
}

message SnapshotStatus {
repeated QRepMirrorStatus clones = 1;
repeated CloneTableSummary clones = 1;
}

message CDCMirrorStatus {
Expand Down
116 changes: 43 additions & 73 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,91 +3,55 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import TimeLabel from '@/components/TimeComponent';
import {
CDCMirrorStatus,
QRepMirrorStatus,
CloneTableSummary,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressBar } from '@/lib/ProgressBar';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react';
import moment, { Duration, Moment } from 'moment';
import Link from 'next/link';
import { useEffect, useMemo, useState } from 'react';
import ReactSelect from 'react-select';
import { useLocalStorage } from 'usehooks-ts';
import CdcDetails from './cdcDetails';

class TableCloneSummary {
flowJobName: string;
tableName: string;
totalNumPartitions: number;
totalNumRows: number;
completedNumPartitions: number;
completedNumRows: number;
avgTimePerPartition: Duration | null;
cloneStartTime: Moment | null;
cloneStartTime: Moment | null = null;
cloneTableSummary: CloneTableSummary;
avgTimePerPartition: Duration | null = null;

constructor(clone: QRepMirrorStatus) {
this.flowJobName = clone.config?.flowJobName || '';
this.tableName = clone.config?.watermarkTable || '';
this.totalNumPartitions = 0;
this.totalNumRows = 0;
this.completedNumPartitions = 0;
this.completedNumRows = 0;
this.avgTimePerPartition = null;
this.cloneStartTime = null;

this.calculate(clone);
}

private calculate(clone: QRepMirrorStatus): void {
let totalTime = moment.duration(0);
clone.partitions?.forEach((partition) => {
this.totalNumPartitions++;
this.totalNumRows += partition.numRows;

if (partition.startTime) {
let st = moment(partition.startTime);
if (!this.cloneStartTime || st.isBefore(this.cloneStartTime)) {
this.cloneStartTime = st;
}
}

if (partition.endTime) {
this.completedNumPartitions++;
this.completedNumRows += partition.numRows;
let st = moment(partition.startTime);
let et = moment(partition.endTime);
let duration = moment.duration(et.diff(st));
totalTime = totalTime.add(duration);
}
});

if (this.completedNumPartitions > 0) {
constructor(clone: CloneTableSummary) {
this.cloneTableSummary = clone;
if (clone.startTime) {
this.cloneStartTime = moment(clone.startTime);
}
if (clone.avgTimePerPartitionMs) {
this.avgTimePerPartition = moment.duration(
totalTime.asMilliseconds() / this.completedNumPartitions
clone.avgTimePerPartitionMs,
'ms'
);
}
}

getRowProgressPercentage(): number {
if (this.totalNumRows === 0) {
return 0;
}
return (this.completedNumRows / this.totalNumRows) * 100;
}

getPartitionProgressPercentage(): number {
if (this.totalNumPartitions === 0) {
if (this.cloneTableSummary.numPartitionsTotal === 0) {
return 0;
}
return (this.completedNumPartitions / this.totalNumPartitions) * 100;
return (
(this.cloneTableSummary.numPartitionsCompleted /
this.cloneTableSummary.numPartitionsTotal) *
100
);
}
}

function summarizeTableClone(clone: QRepMirrorStatus): TableCloneSummary {
function summarizeTableClone(clone: CloneTableSummary): TableCloneSummary {
return new TableCloneSummary(clone);
}

Expand All @@ -106,8 +70,11 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
const [searchQuery, setSearchQuery] = useState<string>('');
const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc');
const displayedRows = useMemo(() => {
const shownRows = allRows.filter((row: any) =>
row.tableName.toLowerCase().includes(searchQuery.toLowerCase())
const shownRows = allRows.filter(
(row: TableCloneSummary) =>
row.cloneTableSummary.tableName
?.toLowerCase()
.includes(searchQuery.toLowerCase())
);
shownRows.sort((a, b) => {
const aValue = a[sortField];
Expand Down Expand Up @@ -231,10 +198,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
<TableCell>
<Label>
<Link
href={`/mirrors/status/qrep/${clone.flowJobName}`}
href={`/mirrors/status/qrep/${clone.cloneTableSummary.flowJobName}`}
className='underline cursor-pointer'
>
{clone.tableName}
{clone.cloneTableSummary.tableName}
</Link>
</Label>
</TableCell>
Expand All @@ -247,9 +214,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
</TableCell>
<TableCell>
<ProgressBar progress={clone.getPartitionProgressPercentage()} />
{clone.completedNumPartitions} / {clone.totalNumPartitions}
{clone.cloneTableSummary.numPartitionsCompleted} /{' '}
{clone.cloneTableSummary.numPartitionsTotal}
</TableCell>
<TableCell>{clone.completedNumRows}</TableCell>
<TableCell>{clone.cloneTableSummary.numRowsSynced}</TableCell>
<TableCell>
<Label>
{clone.avgTimePerPartition?.humanize({ ss: 1 }) || 'N/A'}
Expand All @@ -274,27 +242,29 @@ export function CDCMirror({
createdAt,
syncStatusChild,
}: CDCMirrorStatusProps) {
const [selectedTab, setSelectedTab] = useState(-1);
const LocalStorageTabKey = 'cdctab';

const [selectedTab, setSelectedTab] = useLocalStorage(LocalStorageTabKey, 0);
const [mounted, setMounted] = useState(false);
const handleTab = (index: number) => {
localStorage.setItem(LocalStorageTabKey, index.toString());
setSelectedTab(index);
};

let snapshot = <></>;
if (cdc.snapshotStatus) {
snapshot = <SnapshotStatusTable status={cdc.snapshotStatus} />;
}

useEffect(() => {
if (typeof window !== 'undefined') {
setSelectedTab(
parseInt(localStorage?.getItem(LocalStorageTabKey) ?? '0') | 0
);
}
setMounted(true);
}, []);

if (!mounted) {
return (
<div style={{ marginTop: '1rem' }}>
<Label>
<ProgressCircle variant='determinate_progress_circle' />
</Label>
</div>
);
}
return (
<TabGroup
index={selectedTab}
Expand Down
28 changes: 28 additions & 0 deletions ui/app/mirrors/edit/[mirrorId]/nomirror.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Button } from '@/lib/Button';
import { Label } from '@/lib/Label';
import Link from 'next/link';

const NoMirror = () => {
return (
<div
style={{
display: 'flex',
flexDirection: 'column',
alignItems: 'center',
rowGap: '1rem',
justifyContent: 'center',
}}
>
<Label variant='title2'>Oops! </Label>
<Label as='label' style={{ fontSize: 18 }}>
We were unable to fetch details of this mirror. Please confirm if this
mirror exists.
</Label>
<Link href='/mirrors'>
<Button style={{ padding: '1rem' }}>Back to mirrors page</Button>
</Link>
</div>
);
};

export default NoMirror;
5 changes: 5 additions & 0 deletions ui/app/mirrors/edit/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { LayoutMain } from '@/lib/Layout';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';
import { redirect } from 'next/navigation';
import { CDCMirror } from './cdc';
import NoMirror from './nomirror';
import SyncStatus from './syncStatus';

export const dynamic = 'force-dynamic';
Expand Down Expand Up @@ -54,6 +55,10 @@ export default async function EditMirror({
},
});

if (mirrorStatus.errorMessage) {
return <NoMirror />;
}

let syncStatusChild = <></>;
if (mirrorStatus.cdcStatus) {
let rowsSynced = syncs.reduce((acc, sync) => acc + sync.rows_in_batch, 0);
Expand Down
1 change: 1 addition & 0 deletions ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export default async function SyncStatus({
orderBy: {
start_time: 'desc',
},
distinct: ['batch_id'],
});

const rows = syncs.map((sync) => ({
Expand Down
Loading

0 comments on commit 2d6a68a

Please sign in to comment.