diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 0cefe98fb5..a5b1a6c668 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -636,12 +636,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, shutdown <- struct{}{} }() - res, err := dstConn.SyncQRepRecords(config, partition, stream) + rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { return fmt.Errorf("failed to sync records: %w", err) } - if res == 0 { + if rowsSynced == 0 { log.WithFields(log.Fields{ "flowName": config.FlowJobName, }).Infof("no records to push for partition %s\n", partition.PartitionId) @@ -650,9 +650,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, if goroutineErr != nil { return goroutineErr } + + err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition) + if err != nil { + return err + } + log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Infof("pushed %d records\n", res) + }).Infof("pushed %d records\n", rowsSynced) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) @@ -975,12 +981,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, shutdown <- struct{}{} }() - res, err := dstConn.SyncQRepRecords(config, partition, stream) + rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { return 0, fmt.Errorf("failed to sync records: %w", err) } - if res == 0 { + if rowsSynced == 0 { log.WithFields(log.Fields{ "flowName": config.FlowJobName, }).Info("no records to push for xmin\n") @@ -989,9 +995,15 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, if err != nil { return 0, err } + + err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition) + if err != nil { + return 0, err + } + log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Infof("pushed %d records\n", res) + }).Infof("pushed %d records\n", rowsSynced) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 5ff15c8cd3..a7a4bb9613 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -171,7 +171,7 @@ func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID "UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { - return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err) + return fmt.Errorf("error while updating start time for run_uuid %s in qrep_runs: %w", runUUID, err) } return nil @@ -182,7 +182,7 @@ func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID st "UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { - return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err) + return fmt.Errorf("error while updating end time for run_uuid %s in qrep_runs: %w", runUUID, err) } return nil @@ -299,3 +299,17 @@ func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID } return nil } + +func (c *CatalogMirrorMonitor) UpdateRowsSyncedForPartition(ctx context.Context, rowsSynced int, runUUID string, + partition *protos.QRepPartition) error { + if c == nil || c.catalogConn == nil { + return nil + } + + _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1 + WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId) + if err != nil { + return fmt.Errorf("error while updating rows_synced in qrep_partitions: %w", err) + } + return nil +} diff --git a/nexus/catalog/migrations/V14__qrep_rows_synced.sql b/nexus/catalog/migrations/V14__qrep_rows_synced.sql new file mode 100644 index 0000000000..242cf2deaf --- /dev/null +++ b/nexus/catalog/migrations/V14__qrep_rows_synced.sql @@ -0,0 +1,2 @@ +ALTER TABLE peerdb_stats.qrep_partitions +ADD COLUMN rows_synced INTEGER; \ No newline at end of file diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index ba9c45359c..a075432bb3 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -137,21 +137,23 @@ export default function CreateMirrors() { alignItems: 'center', }} > - - handlePeer(val, peerEnd as 'src' | 'dst', setConfig) - } - options={ - (peerEnd === 'src' - ? peers.filter((peer) => peer.type == DBType.POSTGRES) - : peers) ?? [] - } - getOptionValue={getPeerValue} - formatOptionLabel={getPeerLabel} - /> +
+ + handlePeer(val, peerEnd as 'src' | 'dst', setConfig) + } + options={ + (peerEnd === 'src' + ? peers.filter((peer) => peer.type == DBType.POSTGRES) + : peers) ?? [] + } + getOptionValue={getPeerValue} + formatOptionLabel={getPeerLabel} + /> +
{ const [currentPage, setCurrentPage] = useState(1); const totalPages = Math.ceil(allRows.length / ROWS_PER_PAGE); const [searchQuery, setSearchQuery] = useState(''); - const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('asc'); + const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc'); const displayedRows = useMemo(() => { const shownRows = allRows.filter((row: any) => row.tableName.toLowerCase().includes(searchQuery.toLowerCase()) @@ -156,7 +156,7 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { title={} toolbar={{ left: ( - <> +
@@ -170,21 +170,27 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { > - { - const sortVal = - (val?.value as 'cloneStartTime' | 'avgTimePerPartition') ?? - 'cloneStartTime'; - setSortField(sortVal); - }} - value={{ - value: sortField, - label: sortOptions.find((opt) => opt.value === sortField) - ?.label, - }} - defaultValue={{ value: 'cloneStartTime', label: 'Start Time' }} - /> +
+ { + const sortVal = + (val?.value as + | 'cloneStartTime' + | 'avgTimePerPartition') ?? 'cloneStartTime'; + setSortField(sortVal); + }} + value={{ + value: sortField, + label: sortOptions.find((opt) => opt.value === sortField) + ?.label, + }} + defaultValue={{ + value: 'cloneStartTime', + label: 'Start Time', + }} + /> +
- +
), right: ( { title={} toolbar={{ left: ( - <> +
@@ -118,24 +118,26 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { > - opt.value === sortField) - ?.label, - }} - onChange={(val, _) => { - const sortVal = - (val?.value as - | 'startTime' - | 'endTime' - | 'numRows' - | 'batchId') ?? 'batchId'; - setSortField(sortVal); - }} - defaultValue={{ value: 'batchId', label: 'Batch ID' }} - /> +
+ opt.value === sortField) + ?.label, + }} + onChange={(val, _) => { + const sortVal = + (val?.value as + | 'startTime' + | 'endTime' + | 'numRows' + | 'batchId') ?? 'batchId'; + setSortField(sortVal); + }} + defaultValue={{ value: 'batchId', label: 'Batch ID' }} + /> +
- +
), right: ( { {displayedRows.map((row) => ( - + - + + + + ); } @@ -91,15 +95,35 @@ export default function QRepStatusTable({ ); const [searchQuery, setSearchQuery] = useState(''); - const displayedPartitions = useMemo( - () => - visiblePartitions.filter((partition: QRepPartitionStatus) => { + const [sortField, setSortField] = useState<'startTime' | 'endTime'>( + 'startTime' + ); + const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc'); + const displayedPartitions = useMemo(() => { + let currentPartitions = [...visiblePartitions]; + (currentPartitions = currentPartitions.filter( + (partition: QRepPartitionStatus) => { return partition.partitionId .toLowerCase() .includes(searchQuery.toLowerCase()); - }), - [visiblePartitions, searchQuery] - ); + } + )), + currentPartitions.sort((a, b) => { + const aValue = a[sortField]; + const bValue = b[sortField]; + if (aValue === null || bValue === null) { + return 0; + } + if (aValue < bValue) { + return sortDir === 'dsc' ? 1 : -1; + } else if (aValue > bValue) { + return sortDir === 'dsc' ? -1 : 1; + } else { + return 0; + } + }); + return currentPartitions; + }, [visiblePartitions, searchQuery, sortField, sortDir]); const handleNext = () => { if (currentPage < totalPages) setCurrentPage(currentPage + 1); @@ -109,12 +133,16 @@ export default function QRepStatusTable({ if (currentPage > 1) setCurrentPage(currentPage - 1); }; + const sortOptions = [ + { value: 'startTime', label: 'Start Time' }, + { value: 'endTime', label: 'End Time' }, + ]; return ( Progress} toolbar={{ left: ( - <> +
+
+ opt.value === sortField) + ?.label, + }} + onChange={(val, _) => { + const sortVal = + (val?.value as 'startTime' | 'endTime') ?? 'startTime'; + setSortField(sortVal); + }} + defaultValue={{ value: 'startTime', label: 'Start Time' }} + /> +
+ +
- +
), right: ( (