From 504a038740b93af6bcd171857d99ec9b3a72c3b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sun, 7 Jan 2024 17:19:24 +0000 Subject: [PATCH 1/2] flow.Dockerfile: go 1.21.x & don't install dev packages on base (#1010) --- stacks/flow.Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index 6cb36b167a..3dc3221133 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1.2 -FROM golang:1.21.3-bookworm AS builder +FROM golang:1.21-bookworm AS builder RUN apt-get update && apt-get install -y gcc libgeos-dev WORKDIR /root/flow @@ -19,7 +19,7 @@ ENV CGO_ENABLED=1 RUN go build -ldflags="-s -w" -o /root/peer-flow . FROM debian:bookworm-slim AS flow-base -RUN apt-get update && apt-get install -y ca-certificates gcc libgeos-dev +RUN apt-get update && apt-get install -y ca-certificates libgeos-c1v5 WORKDIR /root COPY --from=builder /root/peer-flow . From e3c8304ae1c0ab09a08509b4e2d0ad91f4ead731 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 7 Jan 2024 15:23:08 -0500 Subject: [PATCH 2/2] Store CurrentFlowStatus as a value rather than reference (#1012) --- flow/cmd/handler.go | 4 ++-- flow/cmd/mirror_status.go | 6 ++++-- flow/e2e/test_utils.go | 2 +- flow/workflows/cdc_flow.go | 20 ++++++++++---------- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 643247b542..b655607989 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -446,7 +446,7 @@ func (h *FlowRequestHandler) FlowStateChange( } if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && *currState == protos.FlowStatus_STATUS_RUNNING { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum()) + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING) if err != nil { return nil, err } @@ -468,7 +468,7 @@ func (h *FlowRequestHandler) FlowStateChange( ) } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && (*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum()) + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING) if err != nil { return nil, err } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 6c29bd39b0..45c2a134e3 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -341,8 +341,10 @@ func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID s return state, nil } -func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context, - workflowID string, state *protos.FlowStatus, +func (h *FlowRequestHandler) updateWorkflowStatus( + ctx context.Context, + workflowID string, + state protos.FlowStatus, ) error { _, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state) if err != nil { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 835e367f69..88f4fb3486 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -143,7 +143,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, slog.Error(err.Error()) } - if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { + if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { break } } else { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e0ede98131..ebbf15f807 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -48,7 +48,7 @@ type CDCFlowWorkflowState struct { // Needed to support schema changes. RelationMessageMapping model.RelationMessageMapping // current workflow state - CurrentFlowState *protos.FlowStatus + CurrentFlowState protos.FlowStatus } type SignalProps struct { @@ -72,7 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { RelationName: "protobuf_workaround", }, }, - CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(), + CurrentFlowState: protos.FlowStatus_STATUS_SETUP, } } @@ -171,13 +171,13 @@ func CDCFlowWorkflowWithConfig( if err != nil { return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) } - err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (protos.FlowStatus, error) { return state.CurrentFlowState, nil }) if err != nil { return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) } - err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { + err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status protos.FlowStatus) error { state.CurrentFlowState = status return nil }) @@ -193,7 +193,7 @@ func CDCFlowWorkflowWithConfig( // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. - if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() { + if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING { // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { @@ -223,7 +223,7 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } - state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName) @@ -285,7 +285,7 @@ func CDCFlowWorkflowWithConfig( } } - state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING state.Progress = append(state.Progress, "executed setup flow and snapshot flow") // if initial_copy_only is opted for, we end the flow here. @@ -333,7 +333,7 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() - state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal @@ -352,11 +352,11 @@ func CDCFlowWorkflowWithConfig( // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { w.logger.Info("peer flow has been shutdown") - state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED return state, nil } - state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING // check if total sync flows have been completed // since this happens immediately after we check for signals, the case of a signal being missed