diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e900ca0352..df07450370 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -442,7 +442,7 @@ func CDCFlowWorkflowWithConfig( } } - batchSizeSelector.Select(ctx) + batchSizeSelector.Select(ctx) } workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 734043c698..4c2027ba3f 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -55,18 +55,9 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync") stopLoop := false - for !stopLoop { - var stopLoopVal bool - var anyFalse bool - syncChan.Receive(normalizeFlowCtx, &stopLoopVal) - stopLoop = stopLoop || stopLoopVal - anyFalse = anyFalse || !stopLoopVal - for syncChan.ReceiveAsync(&stopLoopVal) { - stopLoop = stopLoop || stopLoopVal - anyFalse = anyFalse || !stopLoopVal - } - - if anyFalse { + needSync := true + for { + if needSync { startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, } @@ -78,7 +69,22 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( } result = append(result, *normalizeResponse) } - } - return result, nil + if !stopLoop { + var stopLoopVal bool + syncChan.Receive(normalizeFlowCtx, &stopLoopVal) + stopLoop = stopLoop || stopLoopVal + needSync = !stopLoopVal + for syncChan.ReceiveAsync(&stopLoopVal) { + stopLoop = stopLoop || stopLoopVal + needSync = needSync || !stopLoopVal + } + } else { + needSync = false + } + + if stopLoop && !needSync { + return result, nil + } + } }