Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make mirror status query not proportional to number of partitions #992

Merged
merged 2 commits into from
Jan 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 30 additions & 52 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,12 @@ func (h *FlowRequestHandler) QRepFlowStatus(
ctx context.Context,
req *protos.MirrorStatusRequest,
) (*protos.QRepMirrorStatus, error) {
parts, err := h.getPartitionUUIDs(ctx, req.FlowJobName)
partitionStatuses, err := h.getPartitionStatuses(ctx, req.FlowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", req.FlowJobName, err.Error()))
return nil, err
}

partitionStatuses := []*protos.PartitionStatus{}
for _, part := range parts {
partitionStatus, err := h.getPartitionStatus(ctx, part)
if err != nil {
return nil, err
}

partitionStatuses = append(partitionStatuses, partitionStatus)
}

return &protos.QRepMirrorStatus{
// The clone table jobs that are children of the CDC snapshot flow
// do not have a config entry, so allow this to be nil.
Expand All @@ -133,61 +124,48 @@ func (h *FlowRequestHandler) QRepFlowStatus(
}, nil
}

// getPartitionStatus returns the status of a partition uuid.
func (h *FlowRequestHandler) getPartitionStatus(
func (h *FlowRequestHandler) getPartitionStatuses(
ctx context.Context,
partitionUUID string,
) (*protos.PartitionStatus, error) {
partitionStatus := &protos.PartitionStatus{
PartitionId: partitionUUID,
flowJobName string,
) ([]*protos.PartitionStatus, error) {
q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE flow_name = $1"
rows, err := h.pool.Query(ctx, q, flowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
}

defer rows.Close()

res := []*protos.PartitionStatus{}
var startTime pgtype.Timestamp
var endTime pgtype.Timestamp
var numRows pgtype.Int4

q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE partition_uuid = $1"
err := h.pool.QueryRow(ctx, q, partitionUUID).Scan(&startTime, &endTime, &numRows)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", partitionUUID, err)
}

if startTime.Valid {
partitionStatus.StartTime = timestamppb.New(startTime.Time)
}

if endTime.Valid {
partitionStatus.EndTime = timestamppb.New(endTime.Time)
}
for rows.Next() {
if err := rows.Scan(&startTime, &endTime, &numRows); err != nil {
slog.Error(fmt.Sprintf("unable to scan qrep partition - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to scan qrep partition - %s: %w", flowJobName, err)
}

if numRows.Valid {
partitionStatus.NumRows = numRows.Int32
}
partitionStatus := &protos.PartitionStatus{}

return partitionStatus, nil
}
if startTime.Valid {
partitionStatus.StartTime = timestamppb.New(startTime.Time)
}

func (h *FlowRequestHandler) getPartitionUUIDs(
ctx context.Context,
flowJobName string,
) ([]string, error) {
rows, err := h.pool.Query(ctx,
"SELECT partition_uuid FROM peerdb_stats.qrep_partitions WHERE flow_name = $1", flowJobName)
if err != nil {
return nil, fmt.Errorf("unable to query qrep partitions: %w", err)
}
defer rows.Close()
if endTime.Valid {
partitionStatus.EndTime = timestamppb.New(endTime.Time)
}

partitionUUIDs := []string{}
for rows.Next() {
var partitionUUID pgtype.Text
if err := rows.Scan(&partitionUUID); err != nil {
return nil, fmt.Errorf("unable to scan partition row: %w", err)
if numRows.Valid {
partitionStatus.NumRows = numRows.Int32
}
partitionUUIDs = append(partitionUUIDs, partitionUUID.String)

res = append(res, partitionStatus)
}

return partitionUUIDs, nil
return res, nil
}

func (h *FlowRequestHandler) getFlowConfigFromCatalog(
Expand Down
Loading