diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index bd460e9661..6b71e6b907 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -500,7 +500,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) error { - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition) + err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now()) if err != nil { return fmt.Errorf("failed to update start time for partition: %w", err) } @@ -847,11 +847,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) (int64, error) { - err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition) - if err != nil { - return 0, fmt.Errorf("failed to update start time for partition: %w", err) - } - + startTime := time.Now() srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { return 0, fmt.Errorf("failed to get qrep source connector: %w", err) @@ -885,11 +881,36 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }).Errorf("failed to pull records: %v", err) return err } + + // The first sync of an XMIN mirror will have a partition without a range + // A nil range is not supported by the catalog mirror monitor functions below + // So I'm creating a partition with a range of 0 to numRecords + partitionForMetrics := partition + if partition.Range == nil { + partitionForMetrics = &protos.QRepPartition{ + PartitionId: partition.PartitionId, + Range: &protos.PartitionRange{ + Range: &protos.PartitionRange_IntRange{ + IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)}, + }}, + } + } + updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + if updateErr != nil { + return updateErr + } + + err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime) + if err != nil { + return fmt.Errorf("failed to update start time for partition: %w", err) + } + err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords)) if err != nil { log.Errorf("%v", err) return err } + return nil }) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index ed63d30f74..1502d04a0f 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -306,13 +306,14 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition( ctx context.Context, runUUID string, partition *protos.QRepPartition, + startTime time.Time, ) error { if c == nil || c.catalogConn == nil { return nil } _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1 - WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId) + WHERE run_uuid=$2 AND partition_uuid=$3`, startTime, runUUID, partition.PartitionId) if err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) } diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx index a3fed7e0e9..1f616421a9 100644 --- a/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx +++ b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx @@ -19,6 +19,9 @@ export default async function QRepMirrorStatus({ start_time: { not: null, }, + rows_in_partition: { + not: 0, + }, }, orderBy: { start_time: 'desc',