Skip to content

Commit

Permalink
UI: Refactor api and control cache frontend (#1042)
Browse files Browse the repository at this point in the history
Refactors our initial load status fetch to now perform a join instead of
iterating through flow names and running `len(clones)` number of
queries.

Uses `{cache : 'no-store'}` as an alternative to forcing dynamic
rendering. This is the more idiomatic way of using NextJS and
force-dynamic is just for cases where easier migration from the earlier
getServerSideProps of page router is the need
  • Loading branch information
Amogh-Bharadwaj committed Jan 10, 2024
1 parent 9b04215 commit 783640d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 71 deletions.
147 changes: 77 additions & 70 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func (h *FlowRequestHandler) MirrorStatus(
slog.Info("Mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error()))
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
}, nil
Expand Down Expand Up @@ -82,21 +83,11 @@ func (h *FlowRequestHandler) CDCFlowStatus(

var initialCopyStatus *protos.SnapshotStatus

cloneJobNames, err := h.getCloneTableFlowNames(ctx, req.FlowJobName)
cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

cloneStatuses := []*protos.CloneTableSummary{}
for _, cloneJobName := range cloneJobNames {
cloneStatus, err := h.cloneTableSummary(ctx, cloneJobName)
if err != nil {
return nil, err
}

cloneStatuses = append(cloneStatuses, cloneStatus)
}

initialCopyStatus = &protos.SnapshotStatus{
Clones: cloneStatuses,
}
Expand All @@ -110,64 +101,96 @@ func (h *FlowRequestHandler) CDCFlowStatus(
func (h *FlowRequestHandler) cloneTableSummary(
ctx context.Context,
flowJobName string,
) (*protos.CloneTableSummary, error) {
cfg := h.getQRepConfigFromCatalog(flowJobName)
res := &protos.CloneTableSummary{
FlowJobName: flowJobName,
TableName: cfg.DestinationTableIdentifier,
}

) ([]*protos.CloneTableSummary, error) {
q := `
SELECT
MIN(start_time) AS StartTime,
qp.flow_name,
qr.config_proto,
MIN(qp.start_time) AS StartTime,
COUNT(*) AS NumPartitionsTotal,
COUNT(CASE WHEN end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(rows_in_partition) FILTER (WHERE end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (end_time - start_time)) * 1000) FILTER (WHERE end_time IS NOT NULL) AS AvgTimePerPartitionMs
COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM
peerdb_stats.qrep_partitions
peerdb_stats.qrep_partitions qp
JOIN
peerdb_stats.qrep_runs qr
ON
qp.flow_name = qr.flow_name
WHERE
flow_name = $1;
qp.flow_name ILIKE $1
GROUP BY
qp.flow_name, qr.config_proto;
`

var flowName pgtype.Text
var configBytes []byte
var startTime pgtype.Timestamp
var numPartitionsTotal pgtype.Int8
var numPartitionsCompleted pgtype.Int8
var numRowsSynced pgtype.Int8
var avgTimePerPartitionMs pgtype.Float8

err := h.pool.QueryRow(ctx, q, flowJobName).Scan(
&startTime,
&numPartitionsTotal,
&numPartitionsCompleted,
&numRowsSynced,
&avgTimePerPartitionMs,
)
rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
return nil, fmt.Errorf("unable to query qrep partition - %s: %w", flowJobName, err)
slog.Error(fmt.Sprintf("unable to query initial load partition - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err)
}

if startTime.Valid {
res.StartTime = timestamppb.New(startTime.Time)
}
defer rows.Close()

if numPartitionsTotal.Valid {
res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
}
cloneStatuses := []*protos.CloneTableSummary{}
for rows.Next() {
if err := rows.Scan(
&flowName,
&configBytes,
&startTime,
&numPartitionsTotal,
&numPartitionsCompleted,
&numRowsSynced,
&avgTimePerPartitionMs,
); err != nil {
return nil, fmt.Errorf("unable to scan initial load partition - %s: %w", flowJobName, err)
}

if numPartitionsCompleted.Valid {
res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64)
}
var res protos.CloneTableSummary

if numRowsSynced.Valid {
res.NumRowsSynced = numRowsSynced.Int64
}
if flowName.Valid {
res.FlowJobName = flowName.String
}
if startTime.Valid {
res.StartTime = timestamppb.New(startTime.Time)
}

if avgTimePerPartitionMs.Valid {
res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64)
}
if numPartitionsTotal.Valid {
res.NumPartitionsTotal = int32(numPartitionsTotal.Int64)
}

return res, nil
if numPartitionsCompleted.Valid {
res.NumPartitionsCompleted = int32(numPartitionsCompleted.Int64)
}

if numRowsSynced.Valid {
res.NumRowsSynced = numRowsSynced.Int64
}

if avgTimePerPartitionMs.Valid {
res.AvgTimePerPartitionMs = int64(avgTimePerPartitionMs.Float64)
}

if configBytes != nil {
var config protos.QRepConfig
if err := proto.Unmarshal(configBytes, &config); err != nil {
slog.Error(fmt.Sprintf("unable to unmarshal config: %s", err.Error()))
return nil, fmt.Errorf("unable to unmarshal config: %w", err)
}
res.TableName = config.DestinationTableIdentifier
}

cloneStatuses = append(cloneStatuses, &res)

}
return cloneStatuses, nil
}

func (h *FlowRequestHandler) QRepFlowStatus(
Expand Down Expand Up @@ -243,11 +266,13 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error()))
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
slog.Error(fmt.Sprintf("unable to unmarshal flow config: %s", err.Error()))
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
}

Expand Down Expand Up @@ -299,6 +324,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
var query pgtype.Text
err := h.pool.QueryRow(ctx, "SELECT query_string FROM flows WHERE name = $1", flowJobName).Scan(&query)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error()))
return false, fmt.Errorf("unable to query flow: %w", err)
}

Expand All @@ -309,36 +335,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
return false, nil
}

func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJobName string) ([]string, error) {
q := "SELECT flow_name FROM peerdb_stats.qrep_runs WHERE flow_name ILIKE $1"
rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
return nil, fmt.Errorf("unable to getCloneTableFlowNames: %w", err)
}
defer rows.Close()

flowNames := []string{}
for rows.Next() {
var name pgtype.Text
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("unable to scan flow row: %w", err)
}
if name.Valid {
flowNames = append(flowNames, name.String)
}
}

return flowNames, nil
}

func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state *protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return state, nil
Expand All @@ -351,6 +357,7 @@ func (h *FlowRequestHandler) updateWorkflowStatus(
) error {
_, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state)
if err != nil {
slog.Error(fmt.Sprintf("failed to update state in workflow with ID %s: %s", workflowID, err.Error()))
return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/edit/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function getMirrorStatusUrl(mirrorId: string) {

async function getMirrorStatus(mirrorId: string) {
const url = getMirrorStatusUrl(mirrorId);
const resp = await fetch(url);
const resp = await fetch(url, { cache: 'no-store' });
const json = await resp.json();
return json;
}
Expand Down

0 comments on commit 783640d

Please sign in to comment.