Skip to content

Commit

Permalink
fix rows synced and batch id display
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 11, 2023
1 parent 77cd516 commit 83c5926
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 14 deletions.
24 changes: 18 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
shutdown <- struct{}{}
}()

res, err := dstConn.SyncQRepRecords(config, partition, stream)
rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
return fmt.Errorf("failed to sync records: %w", err)
}

if res == 0 {
if rowsSynced == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("no records to push for partition %s\n", partition.PartitionId)
Expand All @@ -639,9 +639,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if goroutineErr != nil {
return goroutineErr
}

err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
if err != nil {
return err
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", res)
}).Infof("pushed %d records\n", rowsSynced)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
Expand Down Expand Up @@ -970,12 +976,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
shutdown <- struct{}{}
}()

res, err := dstConn.SyncQRepRecords(config, partition, stream)
rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
return 0, fmt.Errorf("failed to sync records: %w", err)
}

if res == 0 {
if rowsSynced == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("no records to push for xmin\n")
Expand All @@ -984,9 +990,15 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
if err != nil {
return 0, err
}

err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", res)
}).Infof("pushed %d records\n", rowsSynced)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
Expand Down
18 changes: 16 additions & 2 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (c *CatalogMirrorMonitor) UpdateStartTimeForQRepRun(ctx context.Context, ru
"UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
return fmt.Errorf("error while updating start time for run_uuid %s in qrep_runs: %w", runUUID, err)
}

return nil
Expand All @@ -240,7 +240,7 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU
"UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
return fmt.Errorf("error while updating end time for run_uuid %s in qrep_runs: %w", runUUID, err)
}

return nil
Expand Down Expand Up @@ -374,3 +374,17 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForPartition(ctx context.Context, ru
}
return nil
}

func (c *CatalogMirrorMonitor) UpdateRowsSyncedForPartition(ctx context.Context, rowsSynced int, runUUID string,
partition *protos.QRepPartition) error {
if c == nil || c.catalogConn == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId)
if err != nil {
return fmt.Errorf("error while updating rows_synced in qrep_partitions: %w", err)
}
return nil
}
2 changes: 2 additions & 0 deletions nexus/catalog/migrations/V13__qrep_rows_synced.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE peerdb_stats.qrep_partitions
ADD COLUMN rows_synced INTEGER;
2 changes: 1 addition & 1 deletion ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => {
{displayedRows.map((row) => (
<TableRow key={row.batchId}>
<TableCell>
<Label>{row.batchId}</Label>
<Label>{Number(row.batchId)}</Label>
</TableCell>
<TableCell>
<Label>
Expand Down
5 changes: 3 additions & 2 deletions ui/app/mirrors/status/qrep/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export default async function QRepMirrorStatus({
runUuid: run.run_uuid,
startTime: run.start_time,
endTime: run.end_time,
numRows: run.rows_in_partition,
pulledRows: run.rows_in_partition,
syncedRows: run.rows_synced,
status: '',
};
return ret;
Expand All @@ -50,7 +51,7 @@ export default async function QRepMirrorStatus({
partitionID: partition.partitionId,
startTime: partition.startTime,
endTime: partition.endTime,
numRows: partition.numRows,
numRows: partition.pulledRows,
}))}
/>
<br></br>
Expand Down
12 changes: 9 additions & 3 deletions ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ export type QRepPartitionStatus = {
status: string;
startTime: Date | null;
endTime: Date | null;
numRows: number | null;
pulledRows: number | null;
syncedRows: number | null;
};

function TimeOrProgressBar({ time }: { time: Date | null }) {
Expand All @@ -33,7 +34,8 @@ function RowPerPartition({
status,
startTime,
endTime,
numRows,
pulledRows: numRows,
syncedRows,
}: QRepPartitionStatus) {
let duration = 'N/A';
if (startTime && endTime) {
Expand Down Expand Up @@ -68,6 +70,9 @@ function RowPerPartition({
<TableCell>
<Label>{numRows}</Label>
</TableCell>
<TableCell>
<Label>{syncedRows ?? 0}</Label>
</TableCell>
</TableRow>
);
}
Expand Down Expand Up @@ -162,7 +167,8 @@ export default function QRepStatusTable({
'Duration',
'Start Time',
'End Time',
'Num Rows Synced',
'Rows In Partition',
'Rows Synced',
].map((heading, index) => (
<TableCell as='th' key={index}>
<Label as='label' style={{ fontWeight: 'bold' }}>
Expand Down
15 changes: 15 additions & 0 deletions ui/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ model qrep_partitions {
restart_count Int
metadata Json?
id Int @id @default(autoincrement())
rows_synced Int?
qrep_runs qrep_runs @relation(fields: [flow_name, run_uuid], references: [flow_name, run_uuid], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run")
@@unique([run_uuid, partition_uuid])
Expand All @@ -144,3 +145,17 @@ model qrep_runs {
@@index([start_time], map: "idx_qrep_runs_start_time")
@@schema("peerdb_stats")
}

model peer_slot_size {
id Int @id @default(autoincrement())
slot_name String
peer_name String
redo_lsn String?
restart_lsn String?
confirmed_flush_lsn String?
slot_size BigInt?
updated_at DateTime @default(now()) @db.Timestamp(6)
@@index([slot_name], map: "index_slot_name")
@@schema("peerdb_stats")
}

0 comments on commit 83c5926

Please sign in to comment.