Skip to content

Commit

Permalink
Fix normalize-done race causing sync/normalize to execute in parallel…
Browse files Browse the repository at this point in the history
… after pause/unpause

Fixes #1448
  • Loading branch information
serprex committed Mar 8, 2024
1 parent cf04650 commit 7a9d5f0
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 1 deletion.
5 changes: 5 additions & 0 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool) {
return result, ok, more
}

func (self TypedReceiveChannel[T]) Drain() {
for self.Chan.ReceiveAsync(nil) {
}
}

func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f func(T, bool)) workflow.Selector {
return selector.AddReceive(self.Chan, func(c workflow.ReceiveChannel, more bool) {
var result T
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ func CDCFlowWorkflow(
})
if !parallel {
normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
normDoneChan.Drain()
normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) {
if syncFlowFuture != nil {
_ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil)
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NormalizeFlowWorkflow(
}
}

if !state.Stop {
if ctx.Err() == nil && !state.Stop {
parallel := GetSideEffect(ctx, func(_ workflow.Context) bool {
return peerdbenv.PeerDBEnableParallelSyncNormalize()
})
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func SyncFlowWorkflow(
waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait")
waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
waitChan.Drain()
waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {})
stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {
stop = true
Expand Down

0 comments on commit 7a9d5f0

Please sign in to comment.