diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 643247b542..b655607989 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -446,7 +446,7 @@ func (h *FlowRequestHandler) FlowStateChange( } if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && *currState == protos.FlowStatus_STATUS_RUNNING { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum()) + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING) if err != nil { return nil, err } @@ -468,7 +468,7 @@ func (h *FlowRequestHandler) FlowStateChange( ) } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && (*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum()) + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING) if err != nil { return nil, err } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 6c29bd39b0..45c2a134e3 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -341,8 +341,10 @@ func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID s return state, nil } -func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context, - workflowID string, state *protos.FlowStatus, +func (h *FlowRequestHandler) updateWorkflowStatus( + ctx context.Context, + workflowID string, + state protos.FlowStatus, ) error { _, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state) if err != nil { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 95e5a8c66f..6db3d5c6fc 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -183,13 +183,13 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen if err == nil { var state peerflow.CDCFlowWorkflowState err = response.Get(&state) - if err != nil { + if err == nil { + if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { + return + } + } else { slog.Error(err.Error()) } - - if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { - return - } } else if counter > 15 { t.Error("UNEXPECTED SETUP CDC TIMEOUT", err.Error()) env.CancelWorkflow() diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 35a2f2c613..f6a5bda189 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -48,7 +48,7 @@ type CDCFlowWorkflowState struct { // Needed to support schema changes. RelationMessageMapping model.RelationMessageMapping // current workflow state - CurrentFlowState *protos.FlowStatus + CurrentFlowState protos.FlowStatus } type SignalProps struct { @@ -72,7 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { RelationName: "protobuf_workaround", }, }, - CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(), + CurrentFlowState: protos.FlowStatus_STATUS_SETUP, } } @@ -171,13 +171,13 @@ func CDCFlowWorkflowWithConfig( if err != nil { return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) } - err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) { return state.CurrentFlowState, nil }) if err != nil { return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) } - err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { + err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status protos.FlowStatus) error { state.CurrentFlowState = status return nil }) @@ -193,7 +193,7 @@ func CDCFlowWorkflowWithConfig( // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. - if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() { + if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING { // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { @@ -224,7 +224,7 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } - state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName) @@ -287,7 +287,7 @@ func CDCFlowWorkflowWithConfig( } } - state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING state.Progress = append(state.Progress, "executed setup flow and snapshot flow") // if initial_copy_only is opted for, we end the flow here. @@ -339,7 +339,7 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() - state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal @@ -360,11 +360,11 @@ func CDCFlowWorkflowWithConfig( // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { w.logger.Info("peer flow has been shutdown") - state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED return state, nil } - state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING // check if total sync flows have been completed // since this happens immediately after we check for signals, the case of a signal being missed diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index 6cb36b167a..3dc3221133 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1.2 -FROM golang:1.21.3-bookworm AS builder +FROM golang:1.21-bookworm AS builder RUN apt-get update && apt-get install -y gcc libgeos-dev WORKDIR /root/flow @@ -19,7 +19,7 @@ ENV CGO_ENABLED=1 RUN go build -ldflags="-s -w" -o /root/peer-flow . FROM debian:bookworm-slim AS flow-base -RUN apt-get update && apt-get install -y ca-certificates gcc libgeos-dev +RUN apt-get update && apt-get install -y ca-certificates libgeos-c1v5 WORKDIR /root COPY --from=builder /root/peer-flow .