diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 73bc4f87d9..72a07d95e5 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -226,14 +226,11 @@ func CDCFlowWorkflow( selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignalProperties, _ bool) { cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger) - slog.Info("value of signal", slog.Any("signal", cdcFlowData.Signal)) - slog.Info("cdc signal val", slog.Any("val", val)) state.ActiveSignal = cdcFlowData.Signal syncCountLimit = val.CustomNumberOfSyncs if syncCountLimit <= 0 { syncCountLimit = MaxSyncsPerCdcFlow } - slog.Info("sync limit reception inside pause", slog.Int("limit", syncCountLimit)) }) addCdcPropertiesSignalListener(ctx, logger, selector, state) @@ -451,12 +448,6 @@ func CDCFlowWorkflow( } logger.Info("normalize finished, finishing") - if syncCount == int(syncCountLimit) { - logger.Info("sync count limit reached, pausing", - slog.Int("limit", syncCountLimit), - slog.Int("count", syncCount)) - state.ActiveSignal = model.PauseSignal - } normFlowFuture = nil restart = true finished = true @@ -487,6 +478,12 @@ func CDCFlowWorkflow( normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) normDoneChan.Drain() normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { + if syncCount == int(syncCountLimit) { + logger.Info("sync count limit reached, pausing", + slog.Int("limit", syncCountLimit), + slog.Int("count", syncCount)) + state.ActiveSignal = model.PauseSignal + } if syncFlowFuture != nil { _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil) }