diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 3adddb0af9..14ca09092d 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -428,13 +428,15 @@ func CDCFlowWorkflow( logger.Info("sync finished, finishing normalize") syncFlowFuture = nil restart = true - err = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ - Done: true, - SyncBatchID: -1, - }).Get(ctx, nil) - if err != nil { - logger.Warn("failed to signal normalize done, finishing", slog.Any("error", err)) - finished = true + if normFlowFuture != nil { + err = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ + Done: true, + SyncBatchID: -1, + }).Get(ctx, nil) + if err != nil { + logger.Warn("failed to signal normalize done, finishing", slog.Any("error", err)) + finished = true + } } }) mainLoopSelector.AddFuture(normFlowFuture, func(f workflow.Future) {