Skip to content

Commit

Permalink
Metrics For XMIN (#762)
Browse files Browse the repository at this point in the history
Restores metrics for XMIN mirror.
Since the first partition (first sync) of an XMIN mirror is going to
have a nil range, here we make a name-sake range with end as number of
records pulled and storing that in stats table.

Fixes #761
  • Loading branch information
Amogh-Bharadwaj authored Dec 6, 2023
1 parent cd60e3f commit 1c92ac3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
33 changes: 27 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now())
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}
Expand Down Expand Up @@ -847,11 +847,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) (int64, error) {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
if err != nil {
return 0, fmt.Errorf("failed to update start time for partition: %w", err)
}

startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -885,11 +881,36 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}).Errorf("failed to pull records: %v", err)
return err
}

// The first sync of an XMIN mirror will have a partition without a range
// A nil range is not supported by the catalog mirror monitor functions below
// So I'm creating a partition with a range of 0 to numRecords
partitionForMetrics := partition
if partition.Range == nil {
partitionForMetrics = &protos.QRepPartition{
PartitionId: partition.PartitionId,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
}},
}
}
updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}

err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords))
if err != nil {
log.Errorf("%v", err)
return err
}

return nil
})

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,14 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForPartition(
ctx context.Context,
runUUID string,
partition *protos.QRepPartition,
startTime time.Time,
) error {
if c == nil || c.catalogConn == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId)
WHERE run_uuid=$2 AND partition_uuid=$3`, startTime, runUUID, partition.PartitionId)
if err != nil {
return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions ui/app/mirrors/status/qrep/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ export default async function QRepMirrorStatus({
start_time: {
not: null,
},
rows_in_partition: {
not: 0,
},
},
orderBy: {
start_time: 'desc',
Expand Down

0 comments on commit 1c92ac3

Please sign in to comment.