Skip to content

Commit

Permalink
wait for SyncDone outside main selector loop
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 20, 2024
1 parent 746f4dc commit 1be95d5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 23 deletions.
20 changes: 6 additions & 14 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func CDCFlowWorkflowWithConfig(
}
}

var canceled, normDone bool
var canceled bool
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
mainLoopSelector := workflow.NewSelector(ctx)
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
Expand All @@ -463,13 +463,6 @@ func CDCFlowWorkflowWithConfig(
c.ReceiveAsync(&signalVal)
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
})
if normWaitChan != nil {
mainLoopSelector.AddReceive(normWaitChan, func(c workflow.ReceiveChannel, _ bool) {
var signalVal struct{}
c.ReceiveAsync(&signalVal)
normDone = true
})
}

for {
for !canceled && mainLoopSelector.HasPending() {
Expand Down Expand Up @@ -560,7 +553,7 @@ func CDCFlowWorkflowWithConfig(

var syncDone bool
var normalizeSignalError error
normDone = normWaitChan == nil
normDone := normWaitChan == nil
mainLoopSelector.AddFuture(childSyncFlowFuture, func(f workflow.Future) {
syncDone = true

Expand Down Expand Up @@ -622,11 +615,8 @@ func CDCFlowWorkflowWithConfig(
}
})

for !syncDone && !normDone && !canceled && state.ActiveSignal != shared.ShutdownSignal {
for !syncDone && !canceled && state.ActiveSignal != shared.ShutdownSignal {
mainLoopSelector.Select(ctx)
if childNormalizeFlowFuture.IsReady() {
normDone = true
}
}
if canceled {
break
Expand All @@ -637,10 +627,12 @@ func CDCFlowWorkflowWithConfig(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_TERMINATED
return state, nil
}

if normalizeSignalError != nil {
return state, normalizeSignalError
}
if !normDone {
normWaitChan.Receive(ctx, nil)
}
}

finishNormalize()
Expand Down
16 changes: 7 additions & 9 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,13 @@ func NormalizeFlowWorkflow(ctx workflow.Context,

if !peerdbenv.PeerDBEnableParallelSyncNormalize() {
parent := workflow.GetInfo(ctx).ParentWorkflowExecution
if parent != nil {
workflow.SignalExternalWorkflow(
ctx,
parent.ID,
parent.RunID,
"SyncDone",
struct{}{},
)
}
workflow.SignalExternalWorkflow(
ctx,
parent.ID,
parent.RunID,
"SyncDone",
struct{}{},
)
}
}

Expand Down

0 comments on commit 1be95d5

Please sign in to comment.