diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4ecb3afcd8..f66efa25d1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -418,7 +418,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, shutdown <- true }() - startTime := time.Now() partitions, err := srcConn.GetQRepPartitions(config, last) if err != nil { return nil, fmt.Errorf("failed to get partitions from source: %w", err) @@ -428,7 +427,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, ctx, config, runUUID, - startTime, partitions, ) if err != nil { @@ -447,6 +445,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, partitions *protos.QRepPartitionBatch, runUUID string, ) error { + err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID) + if err != nil { + return fmt.Errorf("failed to update start time for qrep run: %w", err) + } + numPartitions := len(partitions.Partitions) log.Infof("replicating partitions for job - %s - batch %d - size: %d\n", config.FlowJobName, partitions.BatchId, numPartitions) @@ -469,6 +472,11 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) error { + err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition) + if err != nil { + return fmt.Errorf("failed to update start time for partition: %w", err) + } + ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics) srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 367ea81a85..8c5e9a22e7 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -157,7 +157,6 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( ctx context.Context, config *protos.QRepConfig, runUUID string, - startTime time.Time, partitions []*protos.QRepPartition, ) error { if c == nil || c.catalogConn == nil { @@ -166,8 +165,8 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( flowJobName := config.GetFlowJobName() _, err := c.catalogConn.Exec(ctx, - "INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid,start_time) VALUES($1,$2,$3) ON CONFLICT DO NOTHING", - flowJobName, runUUID, startTime) + "INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid) VALUES($1,$2) ON CONFLICT DO NOTHING", + flowJobName, runUUID) if err != nil { return fmt.Errorf("error while inserting qrep run in qrep_runs: %w", err) } @@ -193,6 +192,21 @@ func (c *CatalogMirrorMonitor) InitializeQRepRun( return nil } +func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, runUUID string) error { + if c == nil || c.catalogConn == nil { + return nil + } + + _, err := c.catalogConn.Exec(ctx, + "UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2", + time.Now(), runUUID) + if err != nil { + return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err) + } + + return nil +} + func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runUUID string) error { if c == nil || c.catalogConn == nil { return nil @@ -253,10 +267,10 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo _, err := c.catalogConn.Exec(ctx, `INSERT INTO peerdb_stats.qrep_partitions - (flow_name,run_uuid,partition_uuid,partition_start,partition_end,start_time,restart_count) - VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET + (flow_name,run_uuid,partition_uuid,partition_start,partition_end,restart_count) + VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT(run_uuid,partition_uuid) DO UPDATE SET restart_count=qrep_partitions.restart_count+1`, - flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, time.Now(), 0) + flowJobName, runUUID, partition.PartitionId, rangeStart, rangeEnd, 0) if err != nil { return fmt.Errorf("error while inserting qrep partition in qrep_partitions: %w", err) } @@ -264,6 +278,23 @@ func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJo return nil } +func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( + ctx context.Context, + runUUID string, + partition *protos.QRepPartition, +) error { + if c == nil || c.catalogConn == nil { + return nil + } + + _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 + WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId) + if err != nil { + return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) + } + return nil +} + func (c *CatalogMirrorMonitor) UpdatePullEndTimeAndRowsForPartition(ctx context.Context, runUUID string, partition *protos.QRepPartition, rowsInPartition int64) error { if c == nil || c.catalogConn == nil { diff --git a/nexus/catalog/migrations/V10__mirror_drop_bad_constraints.sql b/nexus/catalog/migrations/V10__mirror_drop_bad_constraints.sql new file mode 100644 index 0000000000..7bf7de1643 --- /dev/null +++ b/nexus/catalog/migrations/V10__mirror_drop_bad_constraints.sql @@ -0,0 +1,19 @@ +-- Drop the foreign key constraint from qrep_partitions to qrep_runs +ALTER TABLE peerdb_stats.qrep_partitions +DROP CONSTRAINT fk_qrep_partitions_run_uuid; + +-- Drop the unique constraint for flow_name from qrep_runs +ALTER TABLE peerdb_stats.qrep_runs +DROP CONSTRAINT uq_qrep_runs_flow_name; + +-- Add unique constraint to qrep_runs for (flow_name, run_uuid) +ALTER TABLE peerdb_stats.qrep_runs +ADD CONSTRAINT uq_qrep_runs_flow_run +UNIQUE (flow_name, run_uuid); + +-- Add foreign key from qrep_partitions to qrep_runs +ALTER TABLE peerdb_stats.qrep_partitions +ADD CONSTRAINT fk_qrep_partitions_run +FOREIGN KEY (flow_name, run_uuid) +REFERENCES peerdb_stats.qrep_runs(flow_name, run_uuid) +ON DELETE CASCADE; diff --git a/nexus/catalog/migrations/V11__qrep_runs_start_time_nullable.sql b/nexus/catalog/migrations/V11__qrep_runs_start_time_nullable.sql new file mode 100644 index 0000000000..3e78249110 --- /dev/null +++ b/nexus/catalog/migrations/V11__qrep_runs_start_time_nullable.sql @@ -0,0 +1,5 @@ +ALTER TABLE peerdb_stats.qrep_runs +ALTER COLUMN start_time DROP NOT NULL; + +ALTER TABLE peerdb_stats.qrep_partitions +ALTER COLUMN start_time DROP NOT NULL; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index 00bbd2a5c6..a83e31cc08 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -14,6 +14,7 @@ import { ProgressBar } from '@/lib/ProgressBar'; import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import moment, { Duration, Moment } from 'moment'; +import Link from 'next/link'; const Badges = [ @@ -35,6 +36,7 @@ const Badges = [ ]; class TableCloneSummary { + flowJobName: string; tableName: string; totalNumPartitions: number; totalNumRows: number; @@ -44,6 +46,7 @@ class TableCloneSummary { cloneStartTime: Moment | null; constructor(clone: QRepMirrorStatus) { + this.flowJobName = clone.config?.flowJobName || ''; this.tableName = clone.config?.watermarkTable || ''; this.totalNumPartitions = 0; this.totalNumRows = 0; @@ -151,7 +154,14 @@ const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => ( - + diff --git a/ui/app/mirrors/edit/[mirrorId]/qrep.tsx b/ui/app/mirrors/edit/[mirrorId]/qrep.tsx deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx new file mode 100644 index 0000000000..131d788ee6 --- /dev/null +++ b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx @@ -0,0 +1,43 @@ +import prisma from '@/app/utils/prisma'; +import { Header } from '@/lib/Header'; +import { LayoutMain } from '@/lib/Layout'; +import QRepStatusTable, { QRepPartitionStatus } from './qrepStatusTable'; + +type QRepMirrorStatusProps = { + params: { mirrorId: string }; +}; + +export default async function QRepMirrorStatus({ + params: { mirrorId }, +}: QRepMirrorStatusProps) { + const runs = await prisma.qrep_partitions.findMany({ + where: { + flow_name: mirrorId, + start_time: { + not: null, + }, + }, + orderBy: { + start_time: 'desc', + }, + }); + + const partitions = runs.map((run) => { + let ret: QRepPartitionStatus = { + partitionId: run.partition_uuid, + runUuid: run.run_uuid, + startTime: run.start_time, + endTime: run.end_time, + numRows: run.rows_in_partition, + status: '', + }; + return ret; + }); + + return ( + +
{mirrorId}
+ +
+ ); +} diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx new file mode 100644 index 0000000000..55ee7d5819 --- /dev/null +++ b/ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx @@ -0,0 +1,146 @@ +'use client'; + +import { Button } from '@/lib/Button'; +import { Checkbox } from '@/lib/Checkbox'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { ProgressCircle } from '@/lib/ProgressCircle'; +import { SearchField } from '@/lib/SearchField'; +import { Table, TableCell, TableRow } from '@/lib/Table'; +import moment from 'moment'; +import { useState } from 'react'; + +export type QRepPartitionStatus = { + partitionId: string; + runUuid: string; + status: string; + startTime: Date | null; + endTime: Date | null; + numRows: number | null; +}; + +function TimeOrProgressBar({ time }: { time: Date | null }) { + if (time === null) { + return ; + } else { + return ; + } +} + +function RowPerPartition({ + partitionId, + runUuid, + status, + startTime, + endTime, + numRows, +}: QRepPartitionStatus) { + return ( + + + + + + + + + + + + + + + + + + + + + ); +} + +type QRepStatusTableProps = { + flowJobName: string; + partitions: QRepPartitionStatus[]; +}; + +export default function QRepStatusTable({ + flowJobName, + partitions, +}: QRepStatusTableProps) { + const ROWS_PER_PAGE = 10; + const [currentPage, setCurrentPage] = useState(1); + const totalPages = Math.ceil(partitions.length / ROWS_PER_PAGE); + + const visiblePartitions = partitions.slice( + (currentPage - 1) * ROWS_PER_PAGE, + currentPage * ROWS_PER_PAGE + ); + + const handleNext = () => { + if (currentPage < totalPages) setCurrentPage(currentPage + 1); + }; + + const handlePrevious = () => { + if (currentPage > 1) setCurrentPage(currentPage - 1); + }; + + return ( + Progress} + toolbar={{ + left: ( + <> + + + + + +
+ +
+ + ), + right: , + }} + header={ + + + + + Partition UUID + Run UUID + Start Time + End Time + Num Rows Synced + + } + > + {visiblePartitions.map((partition, index) => ( + + ))} +
+ ); +} diff --git a/ui/prisma/schema.prisma b/ui/prisma/schema.prisma index 0e55d6eaa4..81007f1902 100644 --- a/ui/prisma/schema.prisma +++ b/ui/prisma/schema.prisma @@ -113,13 +113,13 @@ model qrep_partitions { partition_start String partition_end String rows_in_partition Int? - start_time DateTime @db.Timestamp(6) + start_time DateTime? @db.Timestamp(6) pull_end_time DateTime? @db.Timestamp(6) end_time DateTime? @db.Timestamp(6) restart_count Int metadata Json? id Int @id @default(autoincrement()) - qrep_runs qrep_runs @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run_uuid") + qrep_runs qrep_runs @relation(fields: [flow_name, run_uuid], references: [flow_name, run_uuid], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run") @@unique([run_uuid, partition_uuid]) @@index([flow_name, run_uuid], map: "idx_qrep_partitions_flow_name_run_uuid") @@ -129,15 +129,16 @@ model qrep_partitions { } model qrep_runs { - flow_name String @unique(map: "uq_qrep_runs_flow_name") + flow_name String run_uuid String - start_time DateTime @db.Timestamp(6) + start_time DateTime? @db.Timestamp(6) end_time DateTime? @db.Timestamp(6) metadata Json? config_proto Bytes? id Int @id @default(autoincrement()) qrep_partitions qrep_partitions[] + @@unique([flow_name, run_uuid], map: "uq_qrep_runs_flow_run") @@index([flow_name], map: "idx_qrep_runs_flow_name", type: Hash) @@index([run_uuid], map: "idx_qrep_runs_run_uuid", type: Hash) @@index([start_time], map: "idx_qrep_runs_start_time")