diff --git a/flow/model/signals.go b/flow/model/signals.go index 4a677f57a9..5e30defd63 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -130,8 +130,8 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{ Name: "cdc-dynamic-properties", } -var SyncSignal = TypedSignal[bool]{ - Name: "sync", +var SyncStopSignal = TypedSignal[struct{}]{ + Name: "sync-stop", } var SyncErrorSignal = TypedSignal[string]{ diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 44b3789346..7520442cd5 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -428,7 +428,7 @@ func CDCFlowWorkflowWithConfig( flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger) if state.ActiveSignal == model.PauseSignal { - model.SyncSignal.SignalChildWorkflow(ctx, syncFlowFuture, true) + model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}) } }) @@ -466,8 +466,8 @@ func CDCFlowWorkflowWithConfig( if !peerdbenv.PeerDBEnableParallelSyncNormalize() { normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) - normDoneChan.AddToSelector(mainLoopSelector, func(_ struct{}, _ bool) { - model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}) + normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { + model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x) }) } @@ -500,7 +500,7 @@ func CDCFlowWorkflowWithConfig( mainLoopSelector.Select(ctx) } if err := ctx.Err(); err != nil { - model.SyncSignal.SignalChildWorkflow(ctx, syncFlowFuture, true) + model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}) finishSyncNormalize() return state, err } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 3b2326580e..7a1fc11571 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -82,14 +82,19 @@ func SyncFlowWorkflow( waitChan = model.NormalizeDoneSignal.GetSignalChannel(ctx) } - var stopLoop bool + var stop bool currentSyncFlowNum := 0 totalRecordsSynced := int64(0) selector := workflow.NewNamedSelector(ctx, "Sync Loop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - for !stopLoop && ctx.Err() == nil { + stopChan := model.SyncStopSignal.GetSignalChannel(ctx) + stopChan.AddToSelector(selector, func(_ struct{}, _ bool) { + stop = true + }) + + for !stop && ctx.Err() == nil { var syncDone, syncErr bool mustWait := waitChan.Chan != nil @@ -214,5 +219,8 @@ func SyncFlowWorkflow( logger.Info("sync canceled: %v", err) return err } + if stop { + return nil + } return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options) }