From b797ff186783701031e4f5c03410171a48748358 Mon Sep 17 00:00:00 2001 From: Kaushik Iska <iska.kaushik@gmail.com> Date: Fri, 5 Jan 2024 09:02:54 -0500 Subject: [PATCH 1/2] Make mirror status query not proportional to number of partitions --- flow/cmd/mirror_status.go | 63 ++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 332e186665..b1ff64077b 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -110,21 +110,12 @@ func (h *FlowRequestHandler) QRepFlowStatus( ctx context.Context, req *protos.MirrorStatusRequest, ) (*protos.QRepMirrorStatus, error) { - parts, err := h.getPartitionUUIDs(ctx, req.FlowJobName) + partitionStatuses, err := h.getPartitionStatuses(ctx, req.FlowJobName) if err != nil { + slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", req.FlowJobName, err.Error())) return nil, err } - partitionStatuses := []*protos.PartitionStatus{} - for _, part := range parts { - partitionStatus, err := h.getPartitionStatus(ctx, part) - if err != nil { - return nil, err - } - - partitionStatuses = append(partitionStatuses, partitionStatus) - } - return &protos.QRepMirrorStatus{ // The clone table jobs that are children of the CDC snapshot flow // do not have a config entry, so allow this to be nil. @@ -133,38 +124,48 @@ func (h *FlowRequestHandler) QRepFlowStatus( }, nil } -// getPartitionStatus returns the status of a partition uuid. -func (h *FlowRequestHandler) getPartitionStatus( +func (h *FlowRequestHandler) getPartitionStatuses( ctx context.Context, - partitionUUID string, -) (*protos.PartitionStatus, error) { - partitionStatus := &protos.PartitionStatus{ - PartitionId: partitionUUID, + flowJobName string, +) ([]*protos.PartitionStatus, error) { + q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE flow_name = $1" + rows, err := h.pool.Query(ctx, q, flowJobName) + if err != nil { + slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", flowJobName, err.Error())) + return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err) } + defer rows.Close() + + res := []*protos.PartitionStatus{} var startTime pgtype.Timestamp var endTime pgtype.Timestamp var numRows pgtype.Int4 - q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE partition_uuid = $1" - err := h.pool.QueryRow(ctx, q, partitionUUID).Scan(&startTime, &endTime, &numRows) - if err != nil { - return nil, fmt.Errorf("unable to query qrep partition - %s: %w", partitionUUID, err) - } + for rows.Next() { + if err := rows.Scan(&startTime, &endTime, &numRows); err != nil { + slog.Error(fmt.Sprintf("unable to scan qrep partition - %s: %s", flowJobName, err.Error())) + return nil, fmt.Errorf("unable to scan qrep partition - %s: %w", flowJobName, err) + } - if startTime.Valid { - partitionStatus.StartTime = timestamppb.New(startTime.Time) - } + partitionStatus := &protos.PartitionStatus{} - if endTime.Valid { - partitionStatus.EndTime = timestamppb.New(endTime.Time) - } + if startTime.Valid { + partitionStatus.StartTime = timestamppb.New(startTime.Time) + } + + if endTime.Valid { + partitionStatus.EndTime = timestamppb.New(endTime.Time) + } + + if numRows.Valid { + partitionStatus.NumRows = numRows.Int32 + } - if numRows.Valid { - partitionStatus.NumRows = numRows.Int32 + res = append(res, partitionStatus) } - return partitionStatus, nil + return res, nil } func (h *FlowRequestHandler) getPartitionUUIDs( From f59e148502e232f008ad03076794febcb8177eb1 Mon Sep 17 00:00:00 2001 From: Kaushik Iska <iska.kaushik@gmail.com> Date: Fri, 5 Jan 2024 09:07:55 -0500 Subject: [PATCH 2/2] remove unused method --- flow/cmd/mirror_status.go | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index b1ff64077b..65f9886f51 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -168,29 +168,6 @@ func (h *FlowRequestHandler) getPartitionStatuses( return res, nil } -func (h *FlowRequestHandler) getPartitionUUIDs( - ctx context.Context, - flowJobName string, -) ([]string, error) { - rows, err := h.pool.Query(ctx, - "SELECT partition_uuid FROM peerdb_stats.qrep_partitions WHERE flow_name = $1", flowJobName) - if err != nil { - return nil, fmt.Errorf("unable to query qrep partitions: %w", err) - } - defer rows.Close() - - partitionUUIDs := []string{} - for rows.Next() { - var partitionUUID pgtype.Text - if err := rows.Scan(&partitionUUID); err != nil { - return nil, fmt.Errorf("unable to scan partition row: %w", err) - } - partitionUUIDs = append(partitionUUIDs, partitionUUID.String) - } - - return partitionUUIDs, nil -} - func (h *FlowRequestHandler) getFlowConfigFromCatalog( flowJobName string, ) (*protos.FlowConnectionConfigs, error) {