From b797ff186783701031e4f5c03410171a48748358 Mon Sep 17 00:00:00 2001
From: Kaushik Iska <iska.kaushik@gmail.com>
Date: Fri, 5 Jan 2024 09:02:54 -0500
Subject: [PATCH 1/2] Make mirror status query not proportional to number of
 partitions

---
 flow/cmd/mirror_status.go | 63 ++++++++++++++++++++-------------------
 1 file changed, 32 insertions(+), 31 deletions(-)

diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go
index 332e186665..b1ff64077b 100644
--- a/flow/cmd/mirror_status.go
+++ b/flow/cmd/mirror_status.go
@@ -110,21 +110,12 @@ func (h *FlowRequestHandler) QRepFlowStatus(
 	ctx context.Context,
 	req *protos.MirrorStatusRequest,
 ) (*protos.QRepMirrorStatus, error) {
-	parts, err := h.getPartitionUUIDs(ctx, req.FlowJobName)
+	partitionStatuses, err := h.getPartitionStatuses(ctx, req.FlowJobName)
 	if err != nil {
+		slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", req.FlowJobName, err.Error()))
 		return nil, err
 	}
 
-	partitionStatuses := []*protos.PartitionStatus{}
-	for _, part := range parts {
-		partitionStatus, err := h.getPartitionStatus(ctx, part)
-		if err != nil {
-			return nil, err
-		}
-
-		partitionStatuses = append(partitionStatuses, partitionStatus)
-	}
-
 	return &protos.QRepMirrorStatus{
 		// The clone table jobs that are children of the CDC snapshot flow
 		// do not have a config entry, so allow this to be nil.
@@ -133,38 +124,48 @@ func (h *FlowRequestHandler) QRepFlowStatus(
 	}, nil
 }
 
-// getPartitionStatus returns the status of a partition uuid.
-func (h *FlowRequestHandler) getPartitionStatus(
+func (h *FlowRequestHandler) getPartitionStatuses(
 	ctx context.Context,
-	partitionUUID string,
-) (*protos.PartitionStatus, error) {
-	partitionStatus := &protos.PartitionStatus{
-		PartitionId: partitionUUID,
+	flowJobName string,
+) ([]*protos.PartitionStatus, error) {
+	q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE flow_name = $1"
+	rows, err := h.pool.Query(ctx, q, flowJobName)
+	if err != nil {
+		slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", flowJobName, err.Error()))
+		return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
 	}
 
+	defer rows.Close()
+
+	res := []*protos.PartitionStatus{}
 	var startTime pgtype.Timestamp
 	var endTime pgtype.Timestamp
 	var numRows pgtype.Int4
 
-	q := "SELECT start_time, end_time, rows_in_partition FROM peerdb_stats.qrep_partitions WHERE partition_uuid = $1"
-	err := h.pool.QueryRow(ctx, q, partitionUUID).Scan(&startTime, &endTime, &numRows)
-	if err != nil {
-		return nil, fmt.Errorf("unable to query qrep partition - %s: %w", partitionUUID, err)
-	}
+	for rows.Next() {
+		if err := rows.Scan(&startTime, &endTime, &numRows); err != nil {
+			slog.Error(fmt.Sprintf("unable to scan qrep partition - %s: %s", flowJobName, err.Error()))
+			return nil, fmt.Errorf("unable to scan qrep partition - %s: %w", flowJobName, err)
+		}
 
-	if startTime.Valid {
-		partitionStatus.StartTime = timestamppb.New(startTime.Time)
-	}
+		partitionStatus := &protos.PartitionStatus{}
 
-	if endTime.Valid {
-		partitionStatus.EndTime = timestamppb.New(endTime.Time)
-	}
+		if startTime.Valid {
+			partitionStatus.StartTime = timestamppb.New(startTime.Time)
+		}
+
+		if endTime.Valid {
+			partitionStatus.EndTime = timestamppb.New(endTime.Time)
+		}
+
+		if numRows.Valid {
+			partitionStatus.NumRows = numRows.Int32
+		}
 
-	if numRows.Valid {
-		partitionStatus.NumRows = numRows.Int32
+		res = append(res, partitionStatus)
 	}
 
-	return partitionStatus, nil
+	return res, nil
 }
 
 func (h *FlowRequestHandler) getPartitionUUIDs(

From f59e148502e232f008ad03076794febcb8177eb1 Mon Sep 17 00:00:00 2001
From: Kaushik Iska <iska.kaushik@gmail.com>
Date: Fri, 5 Jan 2024 09:07:55 -0500
Subject: [PATCH 2/2] remove unused method

---
 flow/cmd/mirror_status.go | 23 -----------------------
 1 file changed, 23 deletions(-)

diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go
index b1ff64077b..65f9886f51 100644
--- a/flow/cmd/mirror_status.go
+++ b/flow/cmd/mirror_status.go
@@ -168,29 +168,6 @@ func (h *FlowRequestHandler) getPartitionStatuses(
 	return res, nil
 }
 
-func (h *FlowRequestHandler) getPartitionUUIDs(
-	ctx context.Context,
-	flowJobName string,
-) ([]string, error) {
-	rows, err := h.pool.Query(ctx,
-		"SELECT partition_uuid FROM peerdb_stats.qrep_partitions WHERE flow_name = $1", flowJobName)
-	if err != nil {
-		return nil, fmt.Errorf("unable to query qrep partitions: %w", err)
-	}
-	defer rows.Close()
-
-	partitionUUIDs := []string{}
-	for rows.Next() {
-		var partitionUUID pgtype.Text
-		if err := rows.Scan(&partitionUUID); err != nil {
-			return nil, fmt.Errorf("unable to scan partition row: %w", err)
-		}
-		partitionUUIDs = append(partitionUUIDs, partitionUUID.String)
-	}
-
-	return partitionUUIDs, nil
-}
-
 func (h *FlowRequestHandler) getFlowConfigFromCatalog(
 	flowJobName string,
 ) (*protos.FlowConnectionConfigs, error) {