Skip to content

Commit

Permalink
make summary even faster
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 5, 2024
1 parent 0900c31 commit 9ccb46b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 66 deletions.
72 changes: 68 additions & 4 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,13 @@ func (h *FlowRequestHandler) CDCFlowStatus(
return nil, err
}

cloneStatuses := []*protos.QRepMirrorStatus{}
cloneStatuses := []*protos.CloneTableSummary{}
for _, cloneJobName := range cloneJobNames {
cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: cloneJobName,
})
cloneStatus, err := h.cloneTableSummary(ctx, cloneJobName)
if err != nil {
return nil, err
}

cloneStatuses = append(cloneStatuses, cloneStatus)
}

Expand All @@ -106,6 +105,71 @@ func (h *FlowRequestHandler) CDCFlowStatus(
}, nil
}

func (h *FlowRequestHandler) cloneTableSummary(
ctx context.Context,
flowJobName string,
) (*protos.CloneTableSummary, error) {
cfg := h.getQRepConfigFromCatalog(flowJobName)
res := &protos.CloneTableSummary{
FlowJobName: flowJobName,
TableName: cfg.DestinationTableIdentifier,
}

q := `
SELECT
MIN(start_time) AS StartTime,
COUNT(*) AS NumPartitionsTotal,
COUNT(CASE WHEN end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(rows_in_partition) FILTER (WHERE end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (end_time - start_time)) * 1000) FILTER (WHERE end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM
peerdb_stats.qrep_partitions
WHERE
flow_name = $1
GROUP BY
flow_name;
`

var startTime pgtype.Timestamp
var numPartitionsTotal pgtype.Int8
var numPartitionsCompleted pgtype.Int8
var numRowsSynced pgtype.Int8
var avgTimePerPartitionMs pgtype.Float8

err := h.pool.QueryRow(ctx, q, flowJobName).Scan(
&startTime,
&numPartitionsTotal,
&numPartitionsCompleted,
&numRowsSynced,
&avgTimePerPartitionMs,
)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
}

if startTime.Valid {
res.StartTime = timestamppb.New(startTime.Time)
}

if numPartitionsTotal.Valid {
res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
}

if numPartitionsCompleted.Valid {
res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64)
}

if numRowsSynced.Valid {
res.NumRowsSynced = int64(numRowsSynced.Int64)
}

if avgTimePerPartitionMs.Valid {
res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64)
}

return res, nil
}

func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
Expand Down
12 changes: 11 additions & 1 deletion protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,18 @@ message PeerStatResponse {
repeated StatInfo stat_data = 1;
}

message CloneTableSummary {
string table_name = 1;
google.protobuf.Timestamp start_time = 2;
int32 num_partitions_completed = 3;
int32 num_partitions_total = 4;
int64 num_rows_synced = 5;
int64 avg_time_per_partition_ms = 6;
string flow_job_name = 7;
}

message SnapshotStatus {
repeated QRepMirrorStatus clones = 1;
repeated CloneTableSummary clones = 1;
}

message CDCMirrorStatus {
Expand Down
85 changes: 24 additions & 61 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import TimeLabel from '@/components/TimeComponent';
import {
CDCMirrorStatus,
QRepMirrorStatus,
CloneTableSummary,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
Expand All @@ -20,74 +20,36 @@ import ReactSelect from 'react-select';
import CdcDetails from './cdcDetails';

class TableCloneSummary {
flowJobName: string;
tableName: string;
totalNumPartitions: number;
totalNumRows: number;
completedNumPartitions: number;
completedNumRows: number;
avgTimePerPartition: Duration | null;
cloneStartTime: Moment | null;
cloneStartTime: Moment | null = null;
cloneTableSummary: CloneTableSummary;
avgTimePerPartition: Duration | null = null;

constructor(clone: QRepMirrorStatus) {
this.flowJobName = clone.config?.flowJobName || '';
this.tableName = clone.config?.watermarkTable || '';
this.totalNumPartitions = 0;
this.totalNumRows = 0;
this.completedNumPartitions = 0;
this.completedNumRows = 0;
this.avgTimePerPartition = null;
this.cloneStartTime = null;

this.calculate(clone);
}

private calculate(clone: QRepMirrorStatus): void {
let totalTime = moment.duration(0);
clone.partitions?.forEach((partition) => {
this.totalNumPartitions++;
this.totalNumRows += partition.numRows;

if (partition.startTime) {
let st = moment(partition.startTime);
if (!this.cloneStartTime || st.isBefore(this.cloneStartTime)) {
this.cloneStartTime = st;
}
}

if (partition.endTime) {
this.completedNumPartitions++;
this.completedNumRows += partition.numRows;
let st = moment(partition.startTime);
let et = moment(partition.endTime);
let duration = moment.duration(et.diff(st));
totalTime = totalTime.add(duration);
}
});

if (this.completedNumPartitions > 0) {
constructor(clone: CloneTableSummary) {
this.cloneTableSummary = clone;
if (clone.startTime) {
this.cloneStartTime = moment(clone.startTime);
}
if (clone.avgTimePerPartitionMs) {
this.avgTimePerPartition = moment.duration(
totalTime.asMilliseconds() / this.completedNumPartitions
clone.avgTimePerPartitionMs,
'ms'
);
}
}

getRowProgressPercentage(): number {
if (this.totalNumRows === 0) {
return 0;
}
return (this.completedNumRows / this.totalNumRows) * 100;
}

getPartitionProgressPercentage(): number {
if (this.totalNumPartitions === 0) {
if (this.cloneTableSummary.numPartitionsTotal === 0) {
return 0;
}
return (this.completedNumPartitions / this.totalNumPartitions) * 100;
return (
(this.cloneTableSummary.numPartitionsCompleted /
this.cloneTableSummary.numPartitionsTotal) *
100
);
}
}

function summarizeTableClone(clone: QRepMirrorStatus): TableCloneSummary {
function summarizeTableClone(clone: CloneTableSummary): TableCloneSummary {
return new TableCloneSummary(clone);
}

Expand Down Expand Up @@ -231,10 +193,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
<TableCell>
<Label>
<Link
href={`/mirrors/status/qrep/${clone.flowJobName}`}
href={`/mirrors/status/qrep/${clone.cloneTableSummary.flowJobName}`}
className='underline cursor-pointer'
>
{clone.tableName}
{clone.cloneTableSummary.tableName}
</Link>
</Label>
</TableCell>
Expand All @@ -247,9 +209,10 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
</TableCell>
<TableCell>
<ProgressBar progress={clone.getPartitionProgressPercentage()} />
{clone.completedNumPartitions} / {clone.totalNumPartitions}
{clone.cloneTableSummary.numPartitionsCompleted} /{' '}
{clone.cloneTableSummary.numPartitionsTotal}
</TableCell>
<TableCell>{clone.completedNumRows}</TableCell>
<TableCell>{clone.cloneTableSummary.numRowsSynced}</TableCell>
<TableCell>
<Label>
{clone.avgTimePerPartition?.humanize({ ss: 1 }) || 'N/A'}
Expand Down

0 comments on commit 9ccb46b

Please sign in to comment.