From 7a9d5f009cfe59ee562f8c843649203957347ddd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 8 Mar 2024 22:24:46 +0000 Subject: [PATCH] Fix normalize-done race causing sync/normalize to execute in parallel after pause/unpause Fixes #1448 --- flow/model/signals.go | 5 +++++ flow/workflows/cdc_flow.go | 1 + flow/workflows/normalize_flow.go | 2 +- flow/workflows/sync_flow.go | 1 + 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/flow/model/signals.go b/flow/model/signals.go index 5cdd9f2264..53fcf7e1e5 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -84,6 +84,11 @@ func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool) { return result, ok, more } +func (self TypedReceiveChannel[T]) Drain() { + for self.Chan.ReceiveAsync(nil) { + } +} + func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f func(T, bool)) workflow.Selector { return selector.AddReceive(self.Chan, func(c workflow.ReceiveChannel, more bool) { var result T diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 93e1d4659b..c3de6393de 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -479,6 +479,7 @@ func CDCFlowWorkflow( }) if !parallel { normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) + normDoneChan.Drain() normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) { if syncFlowFuture != nil { _ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index adc7a7991a..949d1016dd 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -104,7 +104,7 @@ func NormalizeFlowWorkflow( } } - if !state.Stop { + if ctx.Err() == nil && !state.Stop { parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { return peerdbenv.PeerDBEnableParallelSyncNormalize() }) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 0dd5a1e4d9..bc21c03596 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -79,6 +79,7 @@ func SyncFlowWorkflow( waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) + waitChan.Drain() waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {}) stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) { stop = true