From 783640dc31b863e3c9dfc16bd8eda7e3f964ad61 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 10 Jan 2024 15:50:08 +0530 Subject: [PATCH] UI: Refactor api and control cache frontend (#1042) Refactors our initial load status fetch to now perform a join instead of iterating through flow names and running `len(clones)` number of queries. Uses `{cache : 'no-store'}` as an alternative to forcing dynamic rendering. This is the more idiomatic way of using NextJS and force-dynamic is just for cases where easier migration from the earlier getServerSideProps of page router is the need --- flow/cmd/mirror_status.go | 147 +++++++++++++----------- ui/app/mirrors/edit/[mirrorId]/page.tsx | 2 +- 2 files changed, 78 insertions(+), 71 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 546fad1791..c5743ad575 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -20,6 +20,7 @@ func (h *FlowRequestHandler) MirrorStatus( slog.Info("Mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName)) cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName) if err != nil { + slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error())) return &protos.MirrorStatusResponse{ ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()), }, nil @@ -82,21 +83,11 @@ func (h *FlowRequestHandler) CDCFlowStatus( var initialCopyStatus *protos.SnapshotStatus - cloneJobNames, err := h.getCloneTableFlowNames(ctx, req.FlowJobName) + cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName) if err != nil { return nil, err } - cloneStatuses := []*protos.CloneTableSummary{} - for _, cloneJobName := range cloneJobNames { - cloneStatus, err := h.cloneTableSummary(ctx, cloneJobName) - if err != nil { - return nil, err - } - - cloneStatuses = append(cloneStatuses, cloneStatus) - } - initialCopyStatus = &protos.SnapshotStatus{ Clones: cloneStatuses, } @@ -110,64 +101,96 @@ func (h *FlowRequestHandler) CDCFlowStatus( func (h *FlowRequestHandler) cloneTableSummary( ctx context.Context, flowJobName string, -) (*protos.CloneTableSummary, error) { - cfg := h.getQRepConfigFromCatalog(flowJobName) - res := &protos.CloneTableSummary{ - FlowJobName: flowJobName, - TableName: cfg.DestinationTableIdentifier, - } - +) ([]*protos.CloneTableSummary, error) { q := ` SELECT - MIN(start_time) AS StartTime, + qp.flow_name, + qr.config_proto, + MIN(qp.start_time) AS StartTime, COUNT(*) AS NumPartitionsTotal, - COUNT(CASE WHEN end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted, - SUM(rows_in_partition) FILTER (WHERE end_time IS NOT NULL) AS NumRowsSynced, - AVG(EXTRACT(EPOCH FROM (end_time - start_time)) * 1000) FILTER (WHERE end_time IS NOT NULL) AS AvgTimePerPartitionMs + COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted, + SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced, + AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs FROM - peerdb_stats.qrep_partitions + peerdb_stats.qrep_partitions qp + JOIN + peerdb_stats.qrep_runs qr + ON + qp.flow_name = qr.flow_name WHERE - flow_name = $1; + qp.flow_name ILIKE $1 + GROUP BY + qp.flow_name, qr.config_proto; ` + var flowName pgtype.Text + var configBytes []byte var startTime pgtype.Timestamp var numPartitionsTotal pgtype.Int8 var numPartitionsCompleted pgtype.Int8 var numRowsSynced pgtype.Int8 var avgTimePerPartitionMs pgtype.Float8 - err := h.pool.QueryRow(ctx, q, flowJobName).Scan( - &startTime, - &numPartitionsTotal, - &numPartitionsCompleted, - &numRowsSynced, - &avgTimePerPartitionMs, - ) + rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%") if err != nil { - return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err) + slog.Error(fmt.Sprintf("unable to query initial load partition - %s: %s", flowJobName, err.Error())) + return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err) } - if startTime.Valid { - res.StartTime = timestamppb.New(startTime.Time) - } + defer rows.Close() - if numPartitionsTotal.Valid { - res.NumPartitionsTotal = int32(numPartitionsTotal.Int64) - } + cloneStatuses := []*protos.CloneTableSummary{} + for rows.Next() { + if err := rows.Scan( + &flowName, + &configBytes, + &startTime, + &numPartitionsTotal, + &numPartitionsCompleted, + &numRowsSynced, + &avgTimePerPartitionMs, + ); err != nil { + return nil, fmt.Errorf("unable to scan initial load partition - %s: %w", flowJobName, err) + } - if numPartitionsCompleted.Valid { - res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64) - } + var res protos.CloneTableSummary - if numRowsSynced.Valid { - res.NumRowsSynced = numRowsSynced.Int64 - } + if flowName.Valid { + res.FlowJobName = flowName.String + } + if startTime.Valid { + res.StartTime = timestamppb.New(startTime.Time) + } - if avgTimePerPartitionMs.Valid { - res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64) - } + if numPartitionsTotal.Valid { + res.NumPartitionsTotal = int32(numPartitionsTotal.Int64) + } - return res, nil + if numPartitionsCompleted.Valid { + res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64) + } + + if numRowsSynced.Valid { + res.NumRowsSynced = numRowsSynced.Int64 + } + + if avgTimePerPartitionMs.Valid { + res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64) + } + + if configBytes != nil { + var config protos.QRepConfig + if err := proto.Unmarshal(configBytes, &config); err != nil { + slog.Error(fmt.Sprintf("unable to unmarshal config: %s", err.Error())) + return nil, fmt.Errorf("unable to unmarshal config: %w", err) + } + res.TableName = config.DestinationTableIdentifier + } + + cloneStatuses = append(cloneStatuses, &res) + + } + return cloneStatuses, nil } func (h *FlowRequestHandler) QRepFlowStatus( @@ -243,11 +266,13 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog( err = h.pool.QueryRow(context.Background(), "SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes) if err != nil { + slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error())) return nil, fmt.Errorf("unable to query flow config from catalog: %w", err) } err = proto.Unmarshal(configBytes, &config) if err != nil { + slog.Error(fmt.Sprintf("unable to unmarshal flow config: %s", err.Error())) return nil, fmt.Errorf("unable to unmarshal flow config: %w", err) } @@ -299,6 +324,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) var query pgtype.Text err := h.pool.QueryRow(ctx, "SELECT query_string FROM flows WHERE name = $1", flowJobName).Scan(&query) if err != nil { + slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error())) return false, fmt.Errorf("unable to query flow: %w", err) } @@ -309,36 +335,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) return false, nil } -func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJobName string) ([]string, error) { - q := "SELECT flow_name FROM peerdb_stats.qrep_runs WHERE flow_name ILIKE $1" - rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%") - if err != nil { - return nil, fmt.Errorf("unable to getCloneTableFlowNames: %w", err) - } - defer rows.Close() - - flowNames := []string{} - for rows.Next() { - var name pgtype.Text - if err := rows.Scan(&name); err != nil { - return nil, fmt.Errorf("unable to scan flow row: %w", err) - } - if name.Valid { - flowNames = append(flowNames, name.String) - } - } - - return flowNames, nil -} - func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) { res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery) if err != nil { + slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) } var state *protos.FlowStatus err = res.Get(&state) if err != nil { + slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) } return state, nil @@ -351,6 +357,7 @@ func (h *FlowRequestHandler) updateWorkflowStatus( ) error { _, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state) if err != nil { + slog.Error(fmt.Sprintf("failed to update state in workflow with ID %s: %s", workflowID, err.Error())) return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err) } return nil diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/edit/[mirrorId]/page.tsx index 57a8cf9d09..4f68dcf20c 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/page.tsx @@ -19,7 +19,7 @@ function getMirrorStatusUrl(mirrorId: string) { async function getMirrorStatus(mirrorId: string) { const url = getMirrorStatusUrl(mirrorId); - const resp = await fetch(url); + const resp = await fetch(url, { cache: 'no-store' }); const json = await resp.json(); return json; }