diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 3404bbfa25..0fa1110a85 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -214,7 +214,6 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( selector workflow.Selector, state *CDCFlowWorkflowState, ) { - // add a signal to change CDC properties cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx) cdcPropertiesSignalChan.AddToSelector(selector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) { // only modify for options since SyncFlow uses it @@ -240,21 +239,11 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( } func (w *CDCFlowWorkflowExecution) startSyncFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions) { - w.syncFlowFuture = workflow.ExecuteChildWorkflow( - ctx, - SyncFlowWorkflow, - config, - options, - ) + w.syncFlowFuture = workflow.ExecuteChildWorkflow(ctx, SyncFlowWorkflow, config, options) } func (w *CDCFlowWorkflowExecution) startNormFlow(ctx workflow.Context, config *protos.FlowConnectionConfigs) { - w.normFlowFuture = workflow.ExecuteChildWorkflow( - ctx, - NormalizeFlowWorkflow, - config, - nil, - ) + w.normFlowFuture = workflow.ExecuteChildWorkflow(ctx, NormalizeFlowWorkflow, config, nil) } func CDCFlowWorkflow( @@ -499,6 +488,7 @@ func CDCFlowWorkflow( if restart { w.logger.Info("sync finished, finishing normalize") + w.syncFlowFuture = nil _ = model.NormalizeSignal.SignalChildWorkflow(ctx, w.normFlowFuture, model.NormalizePayload{ Done: true, SyncBatchID: -1, @@ -519,6 +509,7 @@ func CDCFlowWorkflow( if restart { w.logger.Info("normalize finished") + w.normFlowFuture = nil finished = true } else { w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err)) @@ -576,7 +567,9 @@ func CDCFlowWorkflow( if !parallel { normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { - _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, x).Get(ctx, nil) + if w.syncFlowFuture != nil { + _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, x).Get(ctx, nil) + } }) }