From 240b5e380bc41e70865952d2a416ecf6f22582ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 5 Mar 2024 16:40:53 +0000 Subject: [PATCH] don't signal normFlowFuture if normFlowFuture already done when sync flow finishes --- flow/workflows/cdc_flow.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 3adddb0af9..14ca09092d 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -428,13 +428,15 @@ func CDCFlowWorkflow( logger.Info("sync finished, finishing normalize") syncFlowFuture = nil restart = true - err = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ - Done: true, - SyncBatchID: -1, - }).Get(ctx, nil) - if err != nil { - logger.Warn("failed to signal normalize done, finishing", slog.Any("error", err)) - finished = true + if normFlowFuture != nil { + err = model.NormalizeSignal.SignalChildWorkflow(ctx, normFlowFuture, model.NormalizePayload{ + Done: true, + SyncBatchID: -1, + }).Get(ctx, nil) + if err != nil { + logger.Warn("failed to signal normalize done, finishing", slog.Any("error", err)) + finished = true + } } }) mainLoopSelector.AddFuture(normFlowFuture, func(f workflow.Future) {