Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make qrep status more useful #522

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
shutdown <- true
}()

startTime := time.Now()
partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
Expand All @@ -428,7 +427,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
ctx,
config,
runUUID,
startTime,
partitions,
)
if err != nil {
Expand All @@ -447,6 +445,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID)
if err != nil {
return fmt.Errorf("failed to update start time for qrep run: %w", err)
}

numPartitions := len(partitions.Partitions)
log.Infof("replicating partitions for job - %s - batch %d - size: %d\n",
config.FlowJobName, partitions.BatchId, numPartitions)
Expand All @@ -469,6 +472,11 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
Expand Down
43 changes: 37 additions & 6 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun(
ctx context.Context,
config *protos.QRepConfig,
runUUID string,
startTime time.Time,
partitions []*protos.QRepPartition,
) error {
if c == nil || c.catalogConn == nil {
Expand All @@ -166,8 +165,8 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun(

flowJobName := config.GetFlowJobName()
_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid,start_time) VALUES($1,$2,$3) ON CONFLICT DO NOTHING",
flowJobName, runUUID, startTime)
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid) VALUES($1,$2) ON CONFLICT DO NOTHING",
flowJobName, runUUID)
if err != nil {
return fmt.Errorf("error while inserting qrep run in qrep_runs: %w", err)
}
Expand All @@ -193,6 +192,21 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun(
return nil
}

func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, runUUID string) error {
if c == nil || c.catalogConn == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
}

return nil
}

func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runUUID string) error {
if c == nil || c.catalogConn == nil {
return nil
Expand Down Expand Up @@ -253,17 +267,34 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo

_, err := c.catalogConn.Exec(ctx,
`INSERT INTO peerdb_stats.qrep_partitions
(flow_name,run_uuid,partition_uuid,partition_start,partition_end,start_time,restart_count)
VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET
(flow_name,run_uuid,partition_uuid,partition_start,partition_end,restart_count)
VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET
restart_count=qrep_partitions.restart_count+1`,
flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, time.Now(), 0)
flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, 0)
if err != nil {
return fmt.Errorf("error while inserting qrep partition in qrep_partitions: %w", err)
}

return nil
}

func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition(
ctx context.Context,
runUUID string,
partition *protos.QRepPartition,
) error {
if c == nil || c.catalogConn == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId)
if err != nil {
return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err)
}
return nil
}

func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context.Context, runUUID string,
partition *protos.QRepPartition, rowsInPartition int64) error {
if c == nil || c.catalogConn == nil {
Expand Down
19 changes: 19 additions & 0 deletions nexus/catalog/migrations/V10__mirror_drop_bad_constraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Drop the foreign key constraint from qrep_partitions to qrep_runs
ALTER TABLE peerdb_stats.qrep_partitions
DROP CONSTRAINT fk_qrep_partitions_run_uuid;

-- Drop the unique constraint for flow_name from qrep_runs
ALTER TABLE peerdb_stats.qrep_runs
DROP CONSTRAINT uq_qrep_runs_flow_name;

-- Add unique constraint to qrep_runs for (flow_name, run_uuid)
ALTER TABLE peerdb_stats.qrep_runs
ADD CONSTRAINT uq_qrep_runs_flow_run
UNIQUE (flow_name, run_uuid);

-- Add foreign key from qrep_partitions to qrep_runs
ALTER TABLE peerdb_stats.qrep_partitions
ADD CONSTRAINT fk_qrep_partitions_run
FOREIGN KEY (flow_name, run_uuid)
REFERENCES peerdb_stats.qrep_runs(flow_name, run_uuid)
ON DELETE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE peerdb_stats.qrep_runs
ALTER COLUMN start_time DROP NOT NULL;

ALTER TABLE peerdb_stats.qrep_partitions
ALTER COLUMN start_time DROP NOT NULL;
12 changes: 11 additions & 1 deletion ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { ProgressBar } from '@/lib/ProgressBar';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import moment, { Duration, Moment } from 'moment';
import Link from 'next/link';

const Badges = [
<Badge variant='positive' key={1}>
Expand All @@ -35,6 +36,7 @@ const Badges = [
];

class TableCloneSummary {
flowJobName: string;
tableName: string;
totalNumPartitions: number;
totalNumRows: number;
Expand All @@ -44,6 +46,7 @@ class TableCloneSummary {
cloneStartTime: Moment | null;

constructor(clone: QRepMirrorStatus) {
this.flowJobName = clone.config?.flowJobName || '';
this.tableName = clone.config?.watermarkTable || '';
this.totalNumPartitions = 0;
this.totalNumRows = 0;
Expand Down Expand Up @@ -151,7 +154,14 @@ const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => (
<Checkbox />
</TableCell>
<TableCell>
<Label>{clone.tableName}</Label>
<Label>
<Link
href={`/mirrors/status/qrep/${clone.flowJobName}`}
className='underline cursor-pointer'
>
{clone.tableName}
</Link>
</Label>
</TableCell>
<TableCell>
<Label>{clone.cloneStartTime?.format('YYYY-MM-DD HH:mm:ss')}</Label>
Expand Down
Empty file.
43 changes: 43 additions & 0 deletions ui/app/mirrors/status/qrep/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import prisma from '@/app/utils/prisma';
import { Header } from '@/lib/Header';
import { LayoutMain } from '@/lib/Layout';
import QRepStatusTable, { QRepPartitionStatus } from './qrepStatusTable';

type QRepMirrorStatusProps = {
params: { mirrorId: string };
};

export default async function QRepMirrorStatus({
params: { mirrorId },
}: QRepMirrorStatusProps) {
const runs = await prisma.qrep_partitions.findMany({
where: {
flow_name: mirrorId,
start_time: {
not: null,
},
},
orderBy: {
start_time: 'desc',
},
});

const partitions = runs.map((run) => {
let ret: QRepPartitionStatus = {
partitionId: run.partition_uuid,
runUuid: run.run_uuid,
startTime: run.start_time,
endTime: run.end_time,
numRows: run.rows_in_partition,
status: '',
};
return ret;
});

return (
<LayoutMain alignSelf='flex-start' justifySelf='flex-start' width='full'>
<Header variant='title2'>{mirrorId}</Header>
<QRepStatusTable flowJobName={mirrorId} partitions={partitions} />
</LayoutMain>
);
}
146 changes: 146 additions & 0 deletions ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use client';

import { Button } from '@/lib/Button';
import { Checkbox } from '@/lib/Checkbox';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import moment from 'moment';
import { useState } from 'react';

export type QRepPartitionStatus = {
partitionId: string;
runUuid: string;
status: string;
startTime: Date | null;
endTime: Date | null;
numRows: number | null;
};

function TimeOrProgressBar({ time }: { time: Date | null }) {
if (time === null) {
return <ProgressCircle variant='determinate_progress_circle' />;
} else {
return <Label>{moment(time)?.format('YYYY-MM-DD HH:mm:ss')}</Label>;
}
}

function RowPerPartition({
partitionId,
runUuid,
status,
startTime,
endTime,
numRows,
}: QRepPartitionStatus) {
return (
<TableRow key={partitionId}>
<TableCell variant='button'>
<Checkbox />
</TableCell>
<TableCell>
<Label>{partitionId}</Label>
</TableCell>
<TableCell>
<Label>{runUuid}</Label>
</TableCell>
<TableCell>
<Label>{moment(startTime)?.format('YYYY-MM-DD HH:mm:ss')}</Label>
</TableCell>
<TableCell>
<Label>
<TimeOrProgressBar time={endTime} />
</Label>
</TableCell>
<TableCell>
<Label>{numRows}</Label>
</TableCell>
</TableRow>
);
}

type QRepStatusTableProps = {
flowJobName: string;
partitions: QRepPartitionStatus[];
};

export default function QRepStatusTable({
flowJobName,
partitions,
}: QRepStatusTableProps) {
const ROWS_PER_PAGE = 10;
const [currentPage, setCurrentPage] = useState(1);
const totalPages = Math.ceil(partitions.length / ROWS_PER_PAGE);

const visiblePartitions = partitions.slice(
(currentPage - 1) * ROWS_PER_PAGE,
currentPage * ROWS_PER_PAGE
);

const handleNext = () => {
if (currentPage < totalPages) setCurrentPage(currentPage + 1);
};

const handlePrevious = () => {
if (currentPage > 1) setCurrentPage(currentPage - 1);
};

return (
<Table
title={<Label variant='headline'>Progress</Label>}
toolbar={{
left: (
<>
<Button
variant='normalBorderless'
onClick={handlePrevious}
disabled={currentPage === 1}
>
<Icon name='chevron_left' />
</Button>
<Button
variant='normalBorderless'
onClick={handleNext}
disabled={currentPage === totalPages}
>
<Icon name='chevron_right' />
</Button>
<Button variant='normalBorderless'>
<Icon name='refresh' />
</Button>
<Button variant='normalBorderless'>
<Icon name='help' />
</Button>
<Button variant='normalBorderless' disabled>
<Icon name='download' />
</Button>
<div>
<Label>
{currentPage} of {totalPages}
</Label>
</div>
</>
),
right: <SearchField placeholder='Search' />,
}}
header={
<TableRow>
<TableCell as='th' variant='button'>
<Checkbox variant='mixed' defaultChecked />
</TableCell>
<TableCell as='th'>Partition UUID</TableCell>
<TableCell as='th'>Run UUID</TableCell>
<TableCell as='th'>Start Time</TableCell>
<TableCell as='th'>End Time</TableCell>
<TableCell as='th'>Num Rows Synced</TableCell>
</TableRow>
}
>
{visiblePartitions.map((partition, index) => (
<RowPerPartition key={index} {...partition} />
))}
</Table>
);
}
Loading