Skip to content

Commit

Permalink
Merge branch 'main' into wait-for
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 7, 2024
2 parents 21fd480 + e3c8304 commit 446df16
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 21 deletions.
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (h *FlowRequestHandler) FlowStateChange(
}
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
*currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum())
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
Expand All @@ -468,7 +468,7 @@ func (h *FlowRequestHandler) FlowStateChange(
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum())
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,10 @@ func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID s
return state, nil
}

func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context,
workflowID string, state *protos.FlowStatus,
func (h *FlowRequestHandler) updateWorkflowStatus(
ctx context.Context,
workflowID string,
state protos.FlowStatus,
) error {
_, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen
if err == nil {
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err != nil {
if err == nil {
if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
return
}
} else {
slog.Error(err.Error())
}

if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
return
}
} else if counter > 15 {
t.Error("UNEXPECTED SETUP CDC TIMEOUT", err.Error())
env.CancelWorkflow()
Expand Down
20 changes: 10 additions & 10 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type CDCFlowWorkflowState struct {
// Needed to support schema changes.
RelationMessageMapping model.RelationMessageMapping
// current workflow state
CurrentFlowState *protos.FlowStatus
CurrentFlowState protos.FlowStatus
}

type SignalProps struct {
Expand All @@ -72,7 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
RelationName: "protobuf_workaround",
},
},
CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(),
CurrentFlowState: protos.FlowStatus_STATUS_SETUP,
}
}

Expand Down Expand Up @@ -171,13 +171,13 @@ func CDCFlowWorkflowWithConfig(
if err != nil {
return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err)
}
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) {
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) {
return state.CurrentFlowState, nil
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
}
err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status protos.FlowStatus) error {
state.CurrentFlowState = status
return nil
})
Expand All @@ -193,7 +193,7 @@ func CDCFlowWorkflowWithConfig(
// because Resync modifies TableMappings before Setup and also before Snapshot
// for safety, rely on the idempotency of SetupFlow instead
// also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here.
if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() {
if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING {
// if resync is true, alter the table name schema mapping to temporarily add
// a suffix to the table names.
if cfg.Resync {
Expand Down Expand Up @@ -224,7 +224,7 @@ func CDCFlowWorkflowWithConfig(
if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
}
state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum()
state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT

// next part of the setup is to snapshot-initial-copy and setup replication slots.
snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName)
Expand Down Expand Up @@ -287,7 +287,7 @@ func CDCFlowWorkflowWithConfig(
}
}

state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum()
state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")

// if initial_copy_only is opted for, we end the flow here.
Expand Down Expand Up @@ -339,7 +339,7 @@ func CDCFlowWorkflowWithConfig(

if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum()
state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
var signalVal shared.CDCFlowSignal

Expand All @@ -360,11 +360,11 @@ func CDCFlowWorkflowWithConfig(
// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
w.logger.Info("peer flow has been shutdown")
state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum()
state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED
return state, nil
}

state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum()
state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING

// check if total sync flows have been completed
// since this happens immediately after we check for signals, the case of a signal being missed
Expand Down
4 changes: 2 additions & 2 deletions stacks/flow.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1.2

FROM golang:1.21.3-bookworm AS builder
FROM golang:1.21-bookworm AS builder
RUN apt-get update && apt-get install -y gcc libgeos-dev
WORKDIR /root/flow

Expand All @@ -19,7 +19,7 @@ ENV CGO_ENABLED=1
RUN go build -ldflags="-s -w" -o /root/peer-flow .

FROM debian:bookworm-slim AS flow-base
RUN apt-get update && apt-get install -y ca-certificates gcc libgeos-dev
RUN apt-get update && apt-get install -y ca-certificates libgeos-c1v5
WORKDIR /root
COPY --from=builder /root/peer-flow .

Expand Down

0 comments on commit 446df16

Please sign in to comment.