From 48089d58115a942fd3cc0d90712f85ef6ccb1819 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 11 Dec 2023 16:17:13 +0530 Subject: [PATCH 1/2] fix rows synced and batch id display --- flow/activities/flowable.go | 24 ++++++++++++++----- .../connectors/utils/monitoring/monitoring.go | 18 ++++++++++++-- .../migrations/V13__qrep_rows_synced.sql | 2 ++ .../edit/[mirrorId]/syncStatusTable.tsx | 2 +- .../mirrors/status/qrep/[mirrorId]/page.tsx | 5 ++-- .../qrep/[mirrorId]/qrepStatusTable.tsx | 12 +++++++--- ui/prisma/schema.prisma | 15 ++++++++++++ 7 files changed, 64 insertions(+), 14 deletions(-) create mode 100644 nexus/catalog/migrations/V13__qrep_rows_synced.sql diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index fae47a2f5c..8f8b4f2b3b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -626,12 +626,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, shutdown <- struct{}{} }() - res, err := dstConn.SyncQRepRecords(config, partition, stream) + rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { return fmt.Errorf("failed to sync records: %w", err) } - if res == 0 { + if rowsSynced == 0 { log.WithFields(log.Fields{ "flowName": config.FlowJobName, }).Infof("no records to push for partition %s\n", partition.PartitionId) @@ -640,9 +640,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, if goroutineErr != nil { return goroutineErr } + + err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition) + if err != nil { + return err + } + log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Infof("pushed %d records\n", res) + }).Infof("pushed %d records\n", rowsSynced) } err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) @@ -971,12 +977,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, shutdown <- struct{}{} }() - res, err := dstConn.SyncQRepRecords(config, partition, stream) + rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { return 0, fmt.Errorf("failed to sync records: %w", err) } - if res == 0 { + if rowsSynced == 0 { log.WithFields(log.Fields{ "flowName": config.FlowJobName, }).Info("no records to push for xmin\n") @@ -985,9 +991,15 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, if err != nil { return 0, err } + + err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition) + if err != nil { + return 0, err + } + log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Infof("pushed %d records\n", res) + }).Infof("pushed %d records\n", rowsSynced) } err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 06a6b4f125..99415cd6da 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -225,7 +225,7 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, ru "UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { - return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err) + return fmt.Errorf("error while updating start time for run_uuid %s in qrep_runs: %w", runUUID, err) } return nil @@ -240,7 +240,7 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU "UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2", time.Now(), runUUID) if err != nil { - return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err) + return fmt.Errorf("error while updating end time for run_uuid %s in qrep_runs: %w", runUUID, err) } return nil @@ -375,3 +375,17 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForPartition(ctx context.Context, ru } return nil } + +func (c *CatalogMirrorMonitor) UpdateRowsSyncedForPartition(ctx context.Context, rowsSynced int, runUUID string, + partition *protos.QRepPartition) error { + if c == nil || c.catalogConn == nil { + return nil + } + + _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1 + WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId) + if err != nil { + return fmt.Errorf("error while updating rows_synced in qrep_partitions: %w", err) + } + return nil +} diff --git a/nexus/catalog/migrations/V13__qrep_rows_synced.sql b/nexus/catalog/migrations/V13__qrep_rows_synced.sql new file mode 100644 index 0000000000..242cf2deaf --- /dev/null +++ b/nexus/catalog/migrations/V13__qrep_rows_synced.sql @@ -0,0 +1,2 @@ +ALTER TABLE peerdb_stats.qrep_partitions +ADD COLUMN rows_synced INTEGER; \ No newline at end of file diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx index 251c5cfb49..c4cb790578 100644 --- a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx @@ -182,7 +182,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { {displayedRows.map((row) => ( - + ); } @@ -217,7 +222,8 @@ export default function QRepStatusTable({ 'Duration', 'Start Time', 'End Time', - 'Num Rows Synced', + 'Rows In Partition', + 'Rows Synced', ].map((heading, index) => (