Skip to content

Commit

Permalink
Merge branch 'main' into one-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 12, 2023
2 parents ef2977e + 15ee6a2 commit 829f9a8
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 78 deletions.
24 changes: 18 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
shutdown <- struct{}{}
}()

res, err := dstConn.SyncQRepRecords(config, partition, stream)
rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
return fmt.Errorf("failed to sync records: %w", err)
}

if res == 0 {
if rowsSynced == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("no records to push for partition %s\n", partition.PartitionId)
Expand All @@ -650,9 +650,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if goroutineErr != nil {
return goroutineErr
}

err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
if err != nil {
return err
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", res)
}).Infof("pushed %d records\n", rowsSynced)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down Expand Up @@ -975,12 +981,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
shutdown <- struct{}{}
}()

res, err := dstConn.SyncQRepRecords(config, partition, stream)
rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
return 0, fmt.Errorf("failed to sync records: %w", err)
}

if res == 0 {
if rowsSynced == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("no records to push for xmin\n")
Expand All @@ -989,9 +995,15 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
if err != nil {
return 0, err
}

err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", res)
}).Infof("pushed %d records\n", rowsSynced)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
18 changes: 16 additions & 2 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID
"UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
return fmt.Errorf("error while updating start time for run_uuid %s in qrep_runs: %w", runUUID, err)
}

return nil
Expand All @@ -182,7 +182,7 @@ func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID st
"UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
if err != nil {
return fmt.Errorf("error while updating num_rows_to_sync for run_uuid %s in qrep_runs: %w", runUUID, err)
return fmt.Errorf("error while updating end time for run_uuid %s in qrep_runs: %w", runUUID, err)
}

return nil
Expand Down Expand Up @@ -299,3 +299,17 @@ func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID
}
return nil
}

func (c *CatalogMirrorMonitor) UpdateRowsSyncedForPartition(ctx context.Context, rowsSynced int, runUUID string,
partition *protos.QRepPartition) error {
if c == nil || c.catalogConn == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET rows_synced=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, rowsSynced, runUUID, partition.PartitionId)
if err != nil {
return fmt.Errorf("error while updating rows_synced in qrep_partitions: %w", err)
}
return nil
}
2 changes: 2 additions & 0 deletions nexus/catalog/migrations/V14__qrep_rows_synced.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE peerdb_stats.qrep_partitions
ADD COLUMN rows_synced INTEGER;
32 changes: 17 additions & 15 deletions ui/app/mirrors/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,23 @@ export default function CreateMirrors() {
alignItems: 'center',
}}
>
<ReactSelect
placeholder={`Select the ${
peerEnd === 'src' ? 'source' : 'destination'
} peer`}
onChange={(val, action) =>
handlePeer(val, peerEnd as 'src' | 'dst', setConfig)
}
options={
(peerEnd === 'src'
? peers.filter((peer) => peer.type == DBType.POSTGRES)
: peers) ?? []
}
getOptionValue={getPeerValue}
formatOptionLabel={getPeerLabel}
/>
<div style={{ width: '100%' }}>
<ReactSelect
placeholder={`Select the ${
peerEnd === 'src' ? 'source' : 'destination'
} peer`}
onChange={(val, action) =>
handlePeer(val, peerEnd as 'src' | 'dst', setConfig)
}
options={
(peerEnd === 'src'
? peers.filter((peer) => peer.type == DBType.POSTGRES)
: peers) ?? []
}
getOptionValue={getPeerValue}
formatOptionLabel={getPeerLabel}
/>
</div>
<InfoPopover
tips={
'The peer from which we will be replicating data. Ensure the prerequisites for this peer are met.'
Expand Down
42 changes: 24 additions & 18 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
const [currentPage, setCurrentPage] = useState(1);
const totalPages = Math.ceil(allRows.length / ROWS_PER_PAGE);
const [searchQuery, setSearchQuery] = useState<string>('');
const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('asc');
const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc');
const displayedRows = useMemo(() => {
const shownRows = allRows.filter((row: any) =>
row.tableName.toLowerCase().includes(searchQuery.toLowerCase())
Expand Down Expand Up @@ -156,7 +156,7 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
title={<Label variant='headline'>Initial Copy</Label>}
toolbar={{
left: (
<>
<div style={{ display: 'flex', alignItems: 'center' }}>
<Button variant='normalBorderless' onClick={handlePrevPage}>
<Icon name='chevron_left' />
</Button>
Expand All @@ -170,21 +170,27 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
>
<Icon name='refresh' />
</Button>
<ReactSelect
options={sortOptions}
onChange={(val, _) => {
const sortVal =
(val?.value as 'cloneStartTime' | 'avgTimePerPartition') ??
'cloneStartTime';
setSortField(sortVal);
}}
value={{
value: sortField,
label: sortOptions.find((opt) => opt.value === sortField)
?.label,
}}
defaultValue={{ value: 'cloneStartTime', label: 'Start Time' }}
/>
<div style={{ minWidth: '10em' }}>
<ReactSelect
options={sortOptions}
onChange={(val, _) => {
const sortVal =
(val?.value as
| 'cloneStartTime'
| 'avgTimePerPartition') ?? 'cloneStartTime';
setSortField(sortVal);
}}
value={{
value: sortField,
label: sortOptions.find((opt) => opt.value === sortField)
?.label,
}}
defaultValue={{
value: 'cloneStartTime',
label: 'Start Time',
}}
/>
</div>
<button
className='IconButton'
onClick={() => setSortDir('asc')}
Expand All @@ -201,7 +207,7 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
>
<Icon name='arrow_downward' />
</button>
</>
</div>
),
right: (
<SearchField
Expand Down
44 changes: 23 additions & 21 deletions ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => {
title={<Label variant='headline'>CDC Syncs</Label>}
toolbar={{
left: (
<>
<div style={{ display: 'flex', alignItems: 'center' }}>
<Button variant='normalBorderless' onClick={handlePrevPage}>
<Icon name='chevron_left' />
</Button>
Expand All @@ -118,24 +118,26 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => {
>
<Icon name='refresh' />
</Button>
<ReactSelect
options={sortOptions}
value={{
value: sortField,
label: sortOptions.find((opt) => opt.value === sortField)
?.label,
}}
onChange={(val, _) => {
const sortVal =
(val?.value as
| 'startTime'
| 'endTime'
| 'numRows'
| 'batchId') ?? 'batchId';
setSortField(sortVal);
}}
defaultValue={{ value: 'batchId', label: 'Batch ID' }}
/>
<div style={{ minWidth: '10em' }}>
<ReactSelect
options={sortOptions}
value={{
value: sortField,
label: sortOptions.find((opt) => opt.value === sortField)
?.label,
}}
onChange={(val, _) => {
const sortVal =
(val?.value as
| 'startTime'
| 'endTime'
| 'numRows'
| 'batchId') ?? 'batchId';
setSortField(sortVal);
}}
defaultValue={{ value: 'batchId', label: 'Batch ID' }}
/>
</div>
<button
className='IconButton'
onClick={() => setSortDir('asc')}
Expand All @@ -152,7 +154,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => {
>
<Icon name='arrow_downward' />
</button>
</>
</div>
),
right: (
<SearchField
Expand Down Expand Up @@ -180,7 +182,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => {
{displayedRows.map((row) => (
<TableRow key={row.batchId}>
<TableCell>
<Label>{row.batchId}</Label>
<Label>{Number(row.batchId)}</Label>
</TableCell>
<TableCell>
<Label>
Expand Down
5 changes: 3 additions & 2 deletions ui/app/mirrors/status/qrep/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export default async function QRepMirrorStatus({
runUuid: run.run_uuid,
startTime: run.start_time,
endTime: run.end_time,
numRows: run.rows_in_partition,
pulledRows: run.rows_in_partition,
syncedRows: run.rows_synced,
status: '',
};
return ret;
Expand All @@ -50,7 +51,7 @@ export default async function QRepMirrorStatus({
partitionID: partition.partitionId,
startTime: partition.startTime,
endTime: partition.endTime,
numRows: partition.numRows,
numRows: partition.pulledRows,
}))}
/>
<br></br>
Expand Down
Loading

0 comments on commit 829f9a8

Please sign in to comment.