diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go
index 4a314a30a1..2692524aa3 100644
--- a/flow/activities/flowable.go
+++ b/flow/activities/flowable.go
@@ -625,12 +625,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
shutdown <- struct{}{}
}()
- res, err := dstConn.SyncQRepRecords(config, partition, stream)
+ rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
return fmt.Errorf("failed to sync records: %w", err)
}
- if res == 0 {
+ if rowsSynced == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("no records to push for partition %s\n", partition.PartitionId)
@@ -639,9 +639,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if goroutineErr != nil {
return goroutineErr
}
+
+ err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
+ if err != nil {
+ return err
+ }
+
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
- }).Infof("pushed %d records\n", res)
+ }).Infof("pushed %d records\n", rowsSynced)
}
err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
@@ -970,12 +976,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
shutdown <- struct{}{}
}()
- res, err := dstConn.SyncQRepRecords(config, partition, stream)
+ rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
return 0, fmt.Errorf("failed to sync records: %w", err)
}
- if res == 0 {
+ if rowsSynced == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("no records to push for xmin\n")
@@ -984,9 +990,15 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
if err != nil {
return 0, err
}
+
+ err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
+ if err != nil {
+ return 0, err
+ }
+
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
- }).Infof("pushed %d records\n", res)
+ }).Infof("pushed %d records\n", rowsSynced)
}
err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go
index 936aaa909c..d16a4d37fb 100644
--- a/flow/connectors/utils/monitoring/monitoring.go
+++ b/flow/connectors/utils/monitoring/monitoring.go
@@ -225,7 +225,7 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, ru
"UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
- return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
+ return fmt.Errorf("error while updating start time for run_uuid %s in qrep_runs: %w", runUUID, err)
}
return nil
@@ -240,7 +240,7 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU
"UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
- return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
+ return fmt.Errorf("error while updating end time for run_uuid %s in qrep_runs: %w", runUUID, err)
}
return nil
@@ -374,3 +374,17 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForPartition(ctx context.Context, ru
}
return nil
}
+
+func (c *CatalogMirrorMonitor) UpdateRowsSyncedForPartition(ctx context.Context, rowsSynced int, runUUID string,
+ partition *protos.QRepPartition) error {
+ if c == nil || c.catalogConn == nil {
+ return nil
+ }
+
+ _, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1
+ WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId)
+ if err != nil {
+ return fmt.Errorf("error while updating rows_synced in qrep_partitions: %w", err)
+ }
+ return nil
+}
diff --git a/nexus/catalog/migrations/V13__qrep_rows_synced.sql b/nexus/catalog/migrations/V13__qrep_rows_synced.sql
new file mode 100644
index 0000000000..242cf2deaf
--- /dev/null
+++ b/nexus/catalog/migrations/V13__qrep_rows_synced.sql
@@ -0,0 +1,2 @@
+ALTER TABLE peerdb_stats.qrep_partitions
+ADD COLUMN rows_synced INTEGER;
\ No newline at end of file
diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx
index d4da7c5920..feaf4a9399 100644
--- a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx
+++ b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx
@@ -180,7 +180,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => {
{displayedRows.map((row) => (
-
+
);
}
@@ -162,7 +167,8 @@ export default function QRepStatusTable({
'Duration',
'Start Time',
'End Time',
- 'Num Rows Synced',
+ 'Rows In Partition',
+ 'Rows Synced',
].map((heading, index) => (
diff --git a/ui/prisma/schema.prisma b/ui/prisma/schema.prisma
index 81007f1902..3e71753d4c 100644
--- a/ui/prisma/schema.prisma
+++ b/ui/prisma/schema.prisma
@@ -119,6 +119,7 @@ model qrep_partitions {
restart_count Int
metadata Json?
id Int @id @default(autoincrement())
+ rows_synced Int?
qrep_runs qrep_runs @relation(fields: [flow_name, run_uuid], references: [flow_name, run_uuid], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run")
@@unique([run_uuid, partition_uuid])
@@ -144,3 +145,17 @@ model qrep_runs {
@@index([start_time], map: "idx_qrep_runs_start_time")
@@schema("peerdb_stats")
}
+
+model peer_slot_size {
+ id Int @id @default(autoincrement())
+ slot_name String
+ peer_name String
+ redo_lsn String?
+ restart_lsn String?
+ confirmed_flush_lsn String?
+ slot_size BigInt?
+ updated_at DateTime @default(now()) @db.Timestamp(6)
+
+ @@index([slot_name], map: "index_slot_name")
+ @@schema("peerdb_stats")
+}