Skip to content

Commit

Permalink
Make mirror status query not proportional to number of partitions (#992)
Browse files Browse the repository at this point in the history
We would iterate through each partition in the qrep flow and make a
query and then render the mirror status page. This reduces all of them
to just one query.
  • Loading branch information
iskakaushik authored Jan 5, 2024
1 parent ee36ff1 commit 09b7b35
Showing 1 changed file with 30 additions and 52 deletions.
82 changes: 30 additions & 52 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,12 @@ func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.QRepMirrorStatus, error) {
parts, err := h.getPartitionUUIDs(ctx, req.FlowJobName)
partitionStatuses, err := h.getPartitionStatuses(ctx, req.FlowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", req.FlowJobName, err.Error()))
return nil, err
}

partitionStatuses := []*protos.PartitionStatus{}
for _, part := range parts {
partitionStatus, err := h.getPartitionStatus(ctx, part)
if err != nil {
return nil, err
}

partitionStatuses = append(partitionStatuses, partitionStatus)
}

return &protos.QRepMirrorStatus{
// The clone table jobs that are children of the CDC snapshot flow
// do not have a config entry, so allow this to be nil.
Expand All @@ -133,61 +124,48 @@ func (h *FlowRequestHandler) QRepFlowStatus(
}, nil
}

// getPartitionStatus returns the status of a partition uuid.
func (h *FlowRequestHandler) getPartitionStatus(
func (h *FlowRequestHandler) getPartitionStatuses(
ctx context.Context,
partitionUUID string,
) (*protos.PartitionStatus, error) {
partitionStatus := &protos.PartitionStatus{
PartitionId: partitionUUID,
flowJobName string,
) ([]*protos.PartitionStatus, error) {
q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE flow_name = $1"
rows, err := h.pool.Query(ctx, q, flowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
}

defer rows.Close()

res := []*protos.PartitionStatus{}
var startTime pgtype.Timestamp
var endTime pgtype.Timestamp
var numRows pgtype.Int4

q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE partition_uuid = $1"
err := h.pool.QueryRow(ctx, q, partitionUUID).Scan(&startTime, &endTime, &numRows)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", partitionUUID, err)
}

if startTime.Valid {
partitionStatus.StartTime = timestamppb.New(startTime.Time)
}

if endTime.Valid {
partitionStatus.EndTime = timestamppb.New(endTime.Time)
}
for rows.Next() {
if err := rows.Scan(&startTime, &endTime, &numRows); err != nil {
slog.Error(fmt.Sprintf("unable to scan qrep partition - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to scan qrep partition - %s: %w", flowJobName, err)
}

if numRows.Valid {
partitionStatus.NumRows = numRows.Int32
}
partitionStatus := &protos.PartitionStatus{}

return partitionStatus, nil
}
if startTime.Valid {
partitionStatus.StartTime = timestamppb.New(startTime.Time)
}

func (h *FlowRequestHandler) getPartitionUUIDs(
ctx context.Context,
flowJobName string,
) ([]string, error) {
rows, err := h.pool.Query(ctx,
"SELECT partition_uuid FROM peerdb_stats.qrep_partitions WHERE flow_name = $1", flowJobName)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partitions: %w", err)
}
defer rows.Close()
if endTime.Valid {
partitionStatus.EndTime = timestamppb.New(endTime.Time)
}

partitionUUIDs := []string{}
for rows.Next() {
var partitionUUID pgtype.Text
if err := rows.Scan(&partitionUUID); err != nil {
return nil, fmt.Errorf("unable to scan partition row: %w", err)
if numRows.Valid {
partitionStatus.NumRows = numRows.Int32
}
partitionUUIDs = append(partitionUUIDs, partitionUUID.String)

res = append(res, partitionStatus)
}

return partitionUUIDs, nil
return res, nil
}

func (h *FlowRequestHandler) getFlowConfigFromCatalog(
Expand Down

0 comments on commit 09b7b35

Please sign in to comment.