diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 643247b542..b655607989 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -446,7 +446,7 @@ func (h *FlowRequestHandler) FlowStateChange( } if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && *currState == protos.FlowStatus_STATUS_RUNNING { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum()) + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING) if err != nil { return nil, err } @@ -468,7 +468,7 @@ func (h *FlowRequestHandler) FlowStateChange( ) } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && (*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum()) + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING) if err != nil { return nil, err } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 6c29bd39b0..45c2a134e3 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -341,8 +341,10 @@ func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID s return state, nil } -func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context, - workflowID string, state *protos.FlowStatus, +func (h *FlowRequestHandler) updateWorkflowStatus( + ctx context.Context, + workflowID string, + state protos.FlowStatus, ) error { _, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state) if err != nil { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 835e367f69..88f4fb3486 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -143,7 +143,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, slog.Error(err.Error()) } - if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { + if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { break } } else { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e0ede98131..ebbf15f807 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -48,7 +48,7 @@ type CDCFlowWorkflowState struct { // Needed to support schema changes. RelationMessageMapping model.RelationMessageMapping // current workflow state - CurrentFlowState *protos.FlowStatus + CurrentFlowState protos.FlowStatus } type SignalProps struct { @@ -72,7 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { RelationName: "protobuf_workaround", }, }, - CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(), + CurrentFlowState: protos.FlowStatus_STATUS_SETUP, } } @@ -171,13 +171,13 @@ func CDCFlowWorkflowWithConfig( if err != nil { return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) } - err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) { return state.CurrentFlowState, nil }) if err != nil { return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) } - err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { + err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status protos.FlowStatus) error { state.CurrentFlowState = status return nil }) @@ -193,7 +193,7 @@ func CDCFlowWorkflowWithConfig( // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. - if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() { + if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING { // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { @@ -223,7 +223,7 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } - state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName) @@ -285,7 +285,7 @@ func CDCFlowWorkflowWithConfig( } } - state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING state.Progress = append(state.Progress, "executed setup flow and snapshot flow") // if initial_copy_only is opted for, we end the flow here. @@ -333,7 +333,7 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() - state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal @@ -352,11 +352,11 @@ func CDCFlowWorkflowWithConfig( // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { w.logger.Info("peer flow has been shutdown") - state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED return state, nil } - state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING // check if total sync flows have been completed // since this happens immediately after we check for signals, the case of a signal being missed