Skip to content

Commit

Permalink
Make qrep status more useful (#522)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 16, 2023
1 parent 5fb024f commit 90d467f
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 13 deletions.
12 changes: 10 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
shutdown <- true
}()

startTime := time.Now()
partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
Expand All @@ -428,7 +427,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
ctx,
config,
runUUID,
startTime,
partitions,
)
if err != nil {
Expand All @@ -447,6 +445,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID)
if err != nil {
return fmt.Errorf("failed to update start time for qrep run: %w", err)
}

numPartitions := len(partitions.Partitions)
log.Infof("replicating partitions for job - %s - batch %d - size: %d\n",
config.FlowJobName, partitions.BatchId, numPartitions)
Expand All @@ -469,6 +472,11 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
Expand Down
43 changes: 37 additions & 6 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun(
ctx context.Context,
config *protos.QRepConfig,
runUUID string,
startTime time.Time,
partitions []*protos.QRepPartition,
) error {
if c == nil || c.catalogConn == nil {
Expand All @@ -166,8 +165,8 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun(

flowJobName := config.GetFlowJobName()
_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid,start_time) VALUES($1,$2,$3) ON CONFLICT DO NOTHING",
flowJobName, runUUID, startTime)
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid) VALUES($1,$2) ON CONFLICT DO NOTHING",
flowJobName, runUUID)
if err != nil {
return fmt.Errorf("error while inserting qrep run in qrep_runs: %w", err)
}
Expand All @@ -193,6 +192,21 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun(
return nil
}

func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, runUUID string) error {
if c == nil || c.catalogConn == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
}

return nil
}

func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runUUID string) error {
if c == nil || c.catalogConn == nil {
return nil
Expand Down Expand Up @@ -253,17 +267,34 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo

_, err := c.catalogConn.Exec(ctx,
`INSERT INTO peerdb_stats.qrep_partitions
(flow_name,run_uuid,partition_uuid,partition_start,partition_end,start_time,restart_count)
VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET
(flow_name,run_uuid,partition_uuid,partition_start,partition_end,restart_count)
VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET
restart_count=qrep_partitions.restart_count+1`,
flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, time.Now(), 0)
flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, 0)
if err != nil {
return fmt.Errorf("error while inserting qrep partition in qrep_partitions: %w", err)
}

return nil
}

func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition(
ctx context.Context,
runUUID string,
partition *protos.QRepPartition,
) error {
if c == nil || c.catalogConn == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId)
if err != nil {
return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err)
}
return nil
}

func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context.Context, runUUID string,
partition *protos.QRepPartition, rowsInPartition int64) error {
if c == nil || c.catalogConn == nil {
Expand Down
19 changes: 19 additions & 0 deletions nexus/catalog/migrations/V10__mirror_drop_bad_constraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Drop the foreign key constraint from qrep_partitions to qrep_runs
ALTER TABLE peerdb_stats.qrep_partitions
DROP CONSTRAINT fk_qrep_partitions_run_uuid;

-- Drop the unique constraint for flow_name from qrep_runs
ALTER TABLE peerdb_stats.qrep_runs
DROP CONSTRAINT uq_qrep_runs_flow_name;

-- Add unique constraint to qrep_runs for (flow_name, run_uuid)
ALTER TABLE peerdb_stats.qrep_runs
ADD CONSTRAINT uq_qrep_runs_flow_run
UNIQUE (flow_name, run_uuid);

-- Add foreign key from qrep_partitions to qrep_runs
ALTER TABLE peerdb_stats.qrep_partitions
ADD CONSTRAINT fk_qrep_partitions_run
FOREIGN KEY (flow_name, run_uuid)
REFERENCES peerdb_stats.qrep_runs(flow_name, run_uuid)
ON DELETE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE peerdb_stats.qrep_runs
ALTER COLUMN start_time DROP NOT NULL;

ALTER TABLE peerdb_stats.qrep_partitions
ALTER COLUMN start_time DROP NOT NULL;
12 changes: 11 additions & 1 deletion ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { ProgressBar } from '@/lib/ProgressBar';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import moment, { Duration, Moment } from 'moment';
import Link from 'next/link';

const Badges = [
<Badge variant='positive' key={1}>
Expand All @@ -35,6 +36,7 @@ const Badges = [
];

class TableCloneSummary {
flowJobName: string;
tableName: string;
totalNumPartitions: number;
totalNumRows: number;
Expand All @@ -44,6 +46,7 @@ class TableCloneSummary {
cloneStartTime: Moment | null;

constructor(clone: QRepMirrorStatus) {
this.flowJobName = clone.config?.flowJobName || '';
this.tableName = clone.config?.watermarkTable || '';
this.totalNumPartitions = 0;
this.totalNumRows = 0;
Expand Down Expand Up @@ -151,7 +154,14 @@ const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => (
<Checkbox />
</TableCell>
<TableCell>
<Label>{clone.tableName}</Label>
<Label>
<Link
href={`/mirrors/status/qrep/${clone.flowJobName}`}
className='underline cursor-pointer'
>
{clone.tableName}
</Link>
</Label>
</TableCell>
<TableCell>
<Label>{clone.cloneStartTime?.format('YYYY-MM-DD HH:mm:ss')}</Label>
Expand Down
Empty file.
43 changes: 43 additions & 0 deletions ui/app/mirrors/status/qrep/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import prisma from '@/app/utils/prisma';
import { Header } from '@/lib/Header';
import { LayoutMain } from '@/lib/Layout';
import QRepStatusTable, { QRepPartitionStatus } from './qrepStatusTable';

type QRepMirrorStatusProps = {
params: { mirrorId: string };
};

export default async function QRepMirrorStatus({
params: { mirrorId },
}: QRepMirrorStatusProps) {
const runs = await prisma.qrep_partitions.findMany({
where: {
flow_name: mirrorId,
start_time: {
not: null,
},
},
orderBy: {
start_time: 'desc',
},
});

const partitions = runs.map((run) => {
let ret: QRepPartitionStatus = {
partitionId: run.partition_uuid,
runUuid: run.run_uuid,
startTime: run.start_time,
endTime: run.end_time,
numRows: run.rows_in_partition,
status: '',
};
return ret;
});

return (
<LayoutMain alignSelf='flex-start' justifySelf='flex-start' width='full'>
<Header variant='title2'>{mirrorId}</Header>
<QRepStatusTable flowJobName={mirrorId} partitions={partitions} />
</LayoutMain>
);
}
146 changes: 146 additions & 0 deletions ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use client';

import { Button } from '@/lib/Button';
import { Checkbox } from '@/lib/Checkbox';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import moment from 'moment';
import { useState } from 'react';

export type QRepPartitionStatus = {
partitionId: string;
runUuid: string;
status: string;
startTime: Date | null;
endTime: Date | null;
numRows: number | null;
};

function TimeOrProgressBar({ time }: { time: Date | null }) {
if (time === null) {
return <ProgressCircle variant='determinate_progress_circle' />;
} else {
return <Label>{moment(time)?.format('YYYY-MM-DD HH:mm:ss')}</Label>;
}
}

function RowPerPartition({
partitionId,
runUuid,
status,
startTime,
endTime,
numRows,
}: QRepPartitionStatus) {
return (
<TableRow key={partitionId}>
<TableCell variant='button'>
<Checkbox />
</TableCell>
<TableCell>
<Label>{partitionId}</Label>
</TableCell>
<TableCell>
<Label>{runUuid}</Label>
</TableCell>
<TableCell>
<Label>{moment(startTime)?.format('YYYY-MM-DD HH:mm:ss')}</Label>
</TableCell>
<TableCell>
<Label>
<TimeOrProgressBar time={endTime} />
</Label>
</TableCell>
<TableCell>
<Label>{numRows}</Label>
</TableCell>
</TableRow>
);
}

type QRepStatusTableProps = {
flowJobName: string;
partitions: QRepPartitionStatus[];
};

export default function QRepStatusTable({
flowJobName,
partitions,
}: QRepStatusTableProps) {
const ROWS_PER_PAGE = 10;
const [currentPage, setCurrentPage] = useState(1);
const totalPages = Math.ceil(partitions.length / ROWS_PER_PAGE);

const visiblePartitions = partitions.slice(
(currentPage - 1) * ROWS_PER_PAGE,
currentPage * ROWS_PER_PAGE
);

const handleNext = () => {
if (currentPage < totalPages) setCurrentPage(currentPage + 1);
};

const handlePrevious = () => {
if (currentPage > 1) setCurrentPage(currentPage - 1);
};

return (
<Table
title={<Label variant='headline'>Progress</Label>}
toolbar={{
left: (
<>
<Button
variant='normalBorderless'
onClick={handlePrevious}
disabled={currentPage === 1}
>
<Icon name='chevron_left' />
</Button>
<Button
variant='normalBorderless'
onClick={handleNext}
disabled={currentPage === totalPages}
>
<Icon name='chevron_right' />
</Button>
<Button variant='normalBorderless'>
<Icon name='refresh' />
</Button>
<Button variant='normalBorderless'>
<Icon name='help' />
</Button>
<Button variant='normalBorderless' disabled>
<Icon name='download' />
</Button>
<div>
<Label>
{currentPage} of {totalPages}
</Label>
</div>
</>
),
right: <SearchField placeholder='Search' />,
}}
header={
<TableRow>
<TableCell as='th' variant='button'>
<Checkbox variant='mixed' defaultChecked />
</TableCell>
<TableCell as='th'>Partition UUID</TableCell>
<TableCell as='th'>Run UUID</TableCell>
<TableCell as='th'>Start Time</TableCell>
<TableCell as='th'>End Time</TableCell>
<TableCell as='th'>Num Rows Synced</TableCell>
</TableRow>
}
>
{visiblePartitions.map((partition, index) => (
<RowPerPartition key={index} {...partition} />
))}
</Table>
);
}
Loading

0 comments on commit 90d467f

Please sign in to comment.