Skip to content

Commit

Permalink
UI: Initial load status for initial load tab (#1522)
Browse files Browse the repository at this point in the history
This PR adds user-facing information denoting the current phase of
snapshot flow - fetching, syncing or consolidate.

<img width="1528" alt="Screenshot 2024-03-21 at 11 15 49 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/ad0623e6-fb89-476a-9453-9647a41cf53f">
<img width="1528" alt="Screenshot 2024-03-21 at 11 16 39 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/9c067b1f-7ae2-446a-bead-c92190660b3f">

Functionally tested.
Fixes #1511
  • Loading branch information
Amogh-Bharadwaj authored Mar 25, 2024
1 parent 8f0c29a commit e1dbd98
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 251 deletions.
11 changes: 10 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,16 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
runUUID string,
) (*protos.QRepParitionResult, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
err := monitoring.InitializeQRepRun(
ctx,
a.CatalogPool,
config,
runUUID,
nil,
)
if err != nil {
return nil, err
}
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return nil, fmt.Errorf("failed to get qrep pull connector: %w", err)
Expand All @@ -555,7 +565,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return "getting partitions for job"
})
defer shutdown()

partitions, err := srcConn.GetQRepPartitions(ctx, config, last)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down
28 changes: 21 additions & 7 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,24 @@ func (h *FlowRequestHandler) cloneTableSummary(
) ([]*protos.CloneTableSummary, error) {
q := `
SELECT
qp.flow_name,
distinct qr.flow_name,
qr.config_proto,
MIN(qp.start_time) AS StartTime,
COUNT(*) AS NumPartitionsTotal,
qr.start_time AS StartTime,
qr.fetch_complete as FetchCompleted,
qr.consolidate_complete as ConsolidateCompleted,
COUNT(CASE WHEN qp.flow_name IS NOT NULL THEN 1 END) AS NumPartitionsTotal,
COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM peerdb_stats.qrep_partitions qp
JOIN peerdb_stats.qrep_runs qr ON qp.flow_name = qr.flow_name
WHERE qp.flow_name ILIKE $1
GROUP BY qp.flow_name, qr.config_proto;
RIGHT JOIN peerdb_stats.qrep_runs qr ON qp.flow_name = qr.flow_name
WHERE qr.flow_name ILIKE $1
GROUP BY qr.flow_name, qr.config_proto, qr.start_time, qr.fetch_complete, qr.consolidate_complete;
`

var flowName pgtype.Text
var configBytes []byte
var fetchCompleted pgtype.Bool
var consolidateCompleted pgtype.Bool
var startTime pgtype.Timestamp
var numPartitionsTotal pgtype.Int8
var numPartitionsCompleted pgtype.Int8
Expand All @@ -157,6 +160,8 @@ func (h *FlowRequestHandler) cloneTableSummary(
&flowName,
&configBytes,
&startTime,
&fetchCompleted,
&consolidateCompleted,
&numPartitionsTotal,
&numPartitionsCompleted,
&numRowsSynced,
Expand All @@ -174,6 +179,14 @@ func (h *FlowRequestHandler) cloneTableSummary(
res.StartTime = timestamppb.New(startTime.Time)
}

if fetchCompleted.Valid {
res.FetchCompleted = fetchCompleted.Bool
}

if consolidateCompleted.Valid {
res.ConsolidateCompleted = consolidateCompleted.Bool
}

if numPartitionsTotal.Valid {
res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
}
Expand All @@ -197,6 +210,7 @@ func (h *FlowRequestHandler) cloneTableSummary(
return nil, fmt.Errorf("unable to unmarshal config: %w", err)
}
res.TableName = config.DestinationTableIdentifier
res.SourceTable = config.WatermarkTable
}

cloneStatuses = append(cloneStatuses, &res)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func InitializeQRepRun(

func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error {
_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2",
"UPDATE peerdb_stats.qrep_runs SET start_time=$1, fetch_complete=true WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating start time for run_uuid %s in qrep_runs: %w", runUUID, err)
Expand All @@ -187,7 +187,7 @@ func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID

func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error {
_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2",
"UPDATE peerdb_stats.qrep_runs SET end_time=$1, consolidate_complete=true WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating end time for run_uuid %s in qrep_runs: %w", runUUID, err)
Expand Down
11 changes: 11 additions & 0 deletions nexus/catalog/migrations/V24__qrep_runs_destination.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE peerdb_stats.qrep_runs
ADD COLUMN destination_table TEXT;

ALTER TABLE peerdb_stats.qrep_runs
ADD COLUMN source_table TEXT;

ALTER TABLE peerdb_stats.qrep_runs
ADD COLUMN fetch_complete BOOLEAN DEFAULT FALSE;

ALTER TABLE peerdb_stats.qrep_runs
ADD COLUMN consolidate_complete BOOLEAN DEFAULT FALSE;
3 changes: 3 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ message CloneTableSummary {
int64 num_rows_synced = 5;
int64 avg_time_per_partition_ms = 6;
string flow_job_name = 7;
string source_table = 8;
bool fetch_completed = 9;
bool consolidate_completed = 10;
}

message SnapshotStatus {
Expand Down
228 changes: 3 additions & 225 deletions ui/app/mirrors/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
@@ -1,235 +1,13 @@
'use client';
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import SelectTheme from '@/app/styles/select';
import TimeLabel from '@/components/TimeComponent';
import {
CloneTableSummary,
MirrorStatusResponse,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Label } from '@/lib/Label';
import { ProgressBar } from '@/lib/ProgressBar';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react';
import moment, { Duration, Moment } from 'moment';
import Link from 'next/link';
import { useEffect, useMemo, useState } from 'react';
import ReactSelect from 'react-select';
import { useEffect, useState } from 'react';
import { useLocalStorage } from 'usehooks-ts';
import CdcDetails from './cdcDetails';

class TableCloneSummary {
cloneStartTime: Moment | null = null;
cloneTableSummary: CloneTableSummary;
avgTimePerPartition: Duration | null = null;

constructor(clone: CloneTableSummary) {
this.cloneTableSummary = clone;
if (clone.startTime) {
this.cloneStartTime = moment(clone.startTime);
}
if (clone.avgTimePerPartitionMs) {
this.avgTimePerPartition = moment.duration(
clone.avgTimePerPartitionMs,
'ms'
);
}
}

getPartitionProgressPercentage(): number {
if (this.cloneTableSummary.numPartitionsTotal === 0) {
return 0;
}
return (
(this.cloneTableSummary.numPartitionsCompleted /
this.cloneTableSummary.numPartitionsTotal) *
100
);
}
}

function summarizeTableClone(clone: CloneTableSummary): TableCloneSummary {
return new TableCloneSummary(clone);
}

type SnapshotStatusProps = {
status: SnapshotStatus;
};

const ROWS_PER_PAGE = 5;
export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
const [sortField, setSortField] = useState<
'cloneStartTime' | 'avgTimePerPartition'
>('cloneStartTime');
const allRows = status.clones.map(summarizeTableClone);
const [currentPage, setCurrentPage] = useState(1);
const totalPages = Math.ceil(allRows.length / ROWS_PER_PAGE);
const [searchQuery, setSearchQuery] = useState<string>('');
const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc');
const displayedRows = useMemo(() => {
const shownRows = allRows.filter((row: TableCloneSummary) =>
row.cloneTableSummary.tableName
?.toLowerCase()
.includes(searchQuery.toLowerCase())
);
shownRows.sort((a, b) => {
const aValue = a[sortField];
const bValue = b[sortField];
if (aValue === null || bValue === null) {
return 0;
}

if (aValue < bValue) {
return sortDir === 'dsc' ? 1 : -1;
} else if (aValue > bValue) {
return sortDir === 'dsc' ? -1 : 1;
} else {
return 0;
}
});

const startRow = (currentPage - 1) * ROWS_PER_PAGE;
const endRow = startRow + ROWS_PER_PAGE;
return shownRows.length > ROWS_PER_PAGE
? shownRows.slice(startRow, endRow)
: shownRows;
}, [allRows, currentPage, searchQuery, sortField, sortDir]);

const handlePrevPage = () => {
if (currentPage > 1) {
setCurrentPage(currentPage - 1);
}
};

const handleNextPage = () => {
if (currentPage < totalPages) {
setCurrentPage(currentPage + 1);
}
};

const sortOptions = [
{ value: 'cloneStartTime', label: 'Start Time' },
{ value: 'avgTimePerPartition', label: 'Time Per Partition' },
];
return (
<div style={{ marginTop: '2rem' }}>
<Table
title={<Label variant='headline'>Initial Copy</Label>}
toolbar={{
left: (
<div style={{ display: 'flex', alignItems: 'center' }}>
<Button variant='normalBorderless' onClick={handlePrevPage}>
<Icon name='chevron_left' />
</Button>
<Button variant='normalBorderless' onClick={handleNextPage}>
<Icon name='chevron_right' />
</Button>
<Label>{`${currentPage} of ${totalPages}`}</Label>
<Button
variant='normalBorderless'
onClick={() => window.location.reload()}
>
<Icon name='refresh' />
</Button>
<div style={{ minWidth: '10em' }}>
<ReactSelect
options={sortOptions}
onChange={(val, _) => {
const sortVal =
(val?.value as
| 'cloneStartTime'
| 'avgTimePerPartition') ?? 'cloneStartTime';
setSortField(sortVal);
}}
value={{
value: sortField,
label: sortOptions.find((opt) => opt.value === sortField)
?.label,
}}
defaultValue={{
value: 'cloneStartTime',
label: 'Start Time',
}}
theme={SelectTheme}
/>
</div>
<button
className='IconButton'
onClick={() => setSortDir('asc')}
aria-label='sort up'
style={{ color: sortDir == 'asc' ? 'green' : 'gray' }}
>
<Icon name='arrow_upward' />
</button>
<button
className='IconButton'
onClick={() => setSortDir('dsc')}
aria-label='sort down'
style={{ color: sortDir == 'dsc' ? 'green' : 'gray' }}
>
<Icon name='arrow_downward' />
</button>
</div>
),
right: (
<SearchField
placeholder='Search by table name'
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setSearchQuery(e.target.value)
}
/>
),
}}
header={
<TableRow>
<TableCell as='th'>Table Identifier</TableCell>
<TableCell as='th'>Start Time</TableCell>
<TableCell as='th'>Progress Partitions</TableCell>
<TableCell as='th'>Num Rows Synced</TableCell>
<TableCell as='th'>Avg Time Per Partition</TableCell>
</TableRow>
}
>
{displayedRows.map((clone, index) => (
<TableRow key={index}>
<TableCell>
<Label>
<Link
href={`/mirrors/status/qrep/${clone.cloneTableSummary.flowJobName}`}
className='underline cursor-pointer'
>
{clone.cloneTableSummary.tableName}
</Link>
</Label>
</TableCell>
<TableCell>
<TimeLabel
timeVal={
clone.cloneStartTime?.format('YYYY-MM-DD HH:mm:ss') || 'N/A'
}
/>
</TableCell>
<TableCell>
<ProgressBar progress={clone.getPartitionProgressPercentage()} />
{clone.cloneTableSummary.numPartitionsCompleted} /{' '}
{clone.cloneTableSummary.numPartitionsTotal}
</TableCell>
<TableCell>{clone.cloneTableSummary.numRowsSynced}</TableCell>
<TableCell>
<Label>
{clone.avgTimePerPartition?.humanize({ ss: 1 }) || 'N/A'}
</Label>
</TableCell>
</TableRow>
))}
</Table>
</div>
);
};
import { SnapshotStatusTable } from './snapshot';

type CDCMirrorStatusProps = {
status: MirrorStatusResponse;
Expand Down
8 changes: 4 additions & 4 deletions ui/app/mirrors/[mirrorId]/configValues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ const MirrorValues = (mirrorConfig: FlowConnectionConfigs | undefined) => {
label: 'Snapshot Parallel Tables',
},
{
value: mirrorConfig?.cdcStagingPath || 'Local',
label: 'CDC Staging Path',
value: `${mirrorConfig?.softDelete}`,
label: 'Soft Delete',
},
{
value: mirrorConfig?.snapshotStagingPath || 'Local',
label: 'Snapshot Staging Path',
value: mirrorConfig?.cdcStagingPath || 'Local',
label: 'CDC Staging Path',
},
{
value: mirrorConfig?.snapshotStagingPath || 'Local',
Expand Down
Loading

0 comments on commit e1dbd98

Please sign in to comment.