Skip to content

Commit

Permalink
Fix normalize-done race causing sync/normalize to execute in parallel…
Browse files Browse the repository at this point in the history
… after pause/resume (#1452)

Fixes #1448

Solves via two redundant angles:
1. drain normalize-done at start
2. don't send normalize-done when `ctx.Err() != nil`

The race was consistently happening when normalize received cancel while
in normalize activity. It'd then send normalize-done to parent, who
already having returned continue-as-new but waiting on children to
finish, would have the signal buffer for its next run

Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
serprex and iskakaushik authored Mar 9, 2024
1 parent 6a66489 commit 120786f
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 1 deletion.
5 changes: 5 additions & 0 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool) {
return result, ok, more
}

func (self TypedReceiveChannel[T]) Drain() {
for self.Chan.ReceiveAsync(nil) {
}
}

func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f func(T, bool)) workflow.Selector {
return selector.AddReceive(self.Chan, func(c workflow.ReceiveChannel, more bool) {
var result T
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ func CDCFlowWorkflow(
})
if !parallel {
normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
normDoneChan.Drain()
normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) {
if syncFlowFuture != nil {
_ = model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x).Get(ctx, nil)
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NormalizeFlowWorkflow(
}
}

if !state.Stop {
if ctx.Err() == nil && !state.Stop {
parallel := GetSideEffect(ctx, func(_ workflow.Context) bool {
return peerdbenv.PeerDBEnableParallelSyncNormalize()
})
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func SyncFlowWorkflow(
waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait")
waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
waitChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
waitChan.Drain()
waitChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {})
stopChan.AddToSelector(waitSelector, func(_ struct{}, _ bool) {
stop = true
Expand Down

0 comments on commit 120786f

Please sign in to comment.