diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go
index 19c818b91e..4a0bc236d4 100644
--- a/flow/cmd/mirror_status.go
+++ b/flow/cmd/mirror_status.go
@@ -85,14 +85,13 @@ func (h *FlowRequestHandler) CDCFlowStatus(
return nil, err
}
- cloneStatuses := []*protos.QRepMirrorStatus{}
+ cloneStatuses := []*protos.CloneTableSummary{}
for _, cloneJobName := range cloneJobNames {
- cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{
- FlowJobName: cloneJobName,
- })
+ cloneStatus, err := h.cloneTableSummary(ctx, cloneJobName)
if err != nil {
return nil, err
}
+
cloneStatuses = append(cloneStatuses, cloneStatus)
}
@@ -106,6 +105,71 @@ func (h *FlowRequestHandler) CDCFlowStatus(
}, nil
}
+func (h *FlowRequestHandler) cloneTableSummary(
+ ctx context.Context,
+ flowJobName string,
+) (*protos.CloneTableSummary, error) {
+ cfg := h.getQRepConfigFromCatalog(flowJobName)
+ res := &protos.CloneTableSummary{
+ FlowJobName: flowJobName,
+ TableName: cfg.DestinationTableIdentifier,
+ }
+
+ q := `
+ SELECT
+ MIN(start_time) AS StartTime,
+ COUNT(*) AS NumPartitionsTotal,
+ COUNT(CASE WHEN end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
+ SUM(rows_in_partition) FILTER (WHERE end_time IS NOT NULL) AS NumRowsSynced,
+ AVG(EXTRACT(EPOCH FROM (end_time - start_time)) * 1000) FILTER (WHERE end_time IS NOT NULL) AS AvgTimePerPartitionMs
+ FROM
+ peerdb_stats.qrep_partitions
+ WHERE
+ flow_name = $1
+ GROUP BY
+ flow_name;
+ `
+
+ var startTime pgtype.Timestamp
+ var numPartitionsTotal pgtype.Int8
+ var numPartitionsCompleted pgtype.Int8
+ var numRowsSynced pgtype.Int8
+ var avgTimePerPartitionMs pgtype.Float8
+
+ err := h.pool.QueryRow(ctx, q, flowJobName).Scan(
+ &startTime,
+ &numPartitionsTotal,
+ &numPartitionsCompleted,
+ &numRowsSynced,
+ &avgTimePerPartitionMs,
+ )
+ if err != nil {
+ return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
+ }
+
+ if startTime.Valid {
+ res.StartTime = timestamppb.New(startTime.Time)
+ }
+
+ if numPartitionsTotal.Valid {
+ res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
+ }
+
+ if numPartitionsCompleted.Valid {
+ res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64)
+ }
+
+ if numRowsSynced.Valid {
+ res.NumRowsSynced = int64(numRowsSynced.Int64)
+ }
+
+ if avgTimePerPartitionMs.Valid {
+ res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64)
+ }
+
+ return res, nil
+}
+
func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
diff --git a/protos/route.proto b/protos/route.proto
index 3fead611de..db330104b3 100644
--- a/protos/route.proto
+++ b/protos/route.proto
@@ -162,8 +162,18 @@ message PeerStatResponse {
repeated StatInfo stat_data = 1;
}
+message CloneTableSummary {
+ string table_name = 1;
+ google.protobuf.Timestamp start_time = 2;
+ int32 num_partitions_completed = 3;
+ int32 num_partitions_total = 4;
+ int64 num_rows_synced = 5;
+ int64 avg_time_per_partition_ms = 6;
+ string flow_job_name = 7;
+}
+
message SnapshotStatus {
- repeated QRepMirrorStatus clones = 1;
+ repeated CloneTableSummary clones = 1;
}
message CDCMirrorStatus {
diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx
index 52d21c4999..63481fbc4d 100644
--- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx
+++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx
@@ -3,7 +3,7 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import TimeLabel from '@/components/TimeComponent';
import {
CDCMirrorStatus,
- QRepMirrorStatus,
+ CloneTableSummary,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
@@ -20,74 +20,36 @@ import ReactSelect from 'react-select';
import CdcDetails from './cdcDetails';
class TableCloneSummary {
- flowJobName: string;
- tableName: string;
- totalNumPartitions: number;
- totalNumRows: number;
- completedNumPartitions: number;
- completedNumRows: number;
- avgTimePerPartition: Duration | null;
- cloneStartTime: Moment | null;
+ cloneStartTime: Moment | null = null;
+ cloneTableSummary: CloneTableSummary;
+ avgTimePerPartition: Duration | null = null;
- constructor(clone: QRepMirrorStatus) {
- this.flowJobName = clone.config?.flowJobName || '';
- this.tableName = clone.config?.watermarkTable || '';
- this.totalNumPartitions = 0;
- this.totalNumRows = 0;
- this.completedNumPartitions = 0;
- this.completedNumRows = 0;
- this.avgTimePerPartition = null;
- this.cloneStartTime = null;
-
- this.calculate(clone);
- }
-
- private calculate(clone: QRepMirrorStatus): void {
- let totalTime = moment.duration(0);
- clone.partitions?.forEach((partition) => {
- this.totalNumPartitions++;
- this.totalNumRows += partition.numRows;
-
- if (partition.startTime) {
- let st = moment(partition.startTime);
- if (!this.cloneStartTime || st.isBefore(this.cloneStartTime)) {
- this.cloneStartTime = st;
- }
- }
-
- if (partition.endTime) {
- this.completedNumPartitions++;
- this.completedNumRows += partition.numRows;
- let st = moment(partition.startTime);
- let et = moment(partition.endTime);
- let duration = moment.duration(et.diff(st));
- totalTime = totalTime.add(duration);
- }
- });
-
- if (this.completedNumPartitions > 0) {
+ constructor(clone: CloneTableSummary) {
+ this.cloneTableSummary = clone;
+ if (clone.startTime) {
+ this.cloneStartTime = moment(clone.startTime);
+ }
+ if (clone.avgTimePerPartitionMs) {
this.avgTimePerPartition = moment.duration(
- totalTime.asMilliseconds() / this.completedNumPartitions
+ clone.avgTimePerPartitionMs,
+ 'ms'
);
}
}
- getRowProgressPercentage(): number {
- if (this.totalNumRows === 0) {
- return 0;
- }
- return (this.completedNumRows / this.totalNumRows) * 100;
- }
-
getPartitionProgressPercentage(): number {
- if (this.totalNumPartitions === 0) {
+ if (this.cloneTableSummary.numPartitionsTotal === 0) {
return 0;
}
- return (this.completedNumPartitions / this.totalNumPartitions) * 100;
+ return (
+ (this.cloneTableSummary.numPartitionsCompleted /
+ this.cloneTableSummary.numPartitionsTotal) *
+ 100
+ );
}
}
-function summarizeTableClone(clone: QRepMirrorStatus): TableCloneSummary {
+function summarizeTableClone(clone: CloneTableSummary): TableCloneSummary {
return new TableCloneSummary(clone);
}
@@ -231,10 +193,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
@@ -247,9 +209,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
- {clone.completedNumPartitions} / {clone.totalNumPartitions}
+ {clone.cloneTableSummary.numPartitionsCompleted} /{' '}
+ {clone.cloneTableSummary.numPartitionsTotal}
- {clone.completedNumRows}
+ {clone.cloneTableSummary.numRowsSynced}