From 1be95d5a6e690c3e843b15c41546077e86cea415 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 20 Jan 2024 02:18:29 +0000 Subject: [PATCH] wait for SyncDone outside main selector loop --- flow/workflows/cdc_flow.go | 20 ++++++-------------- flow/workflows/normalize_flow.go | 16 +++++++--------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 4e1541ee7d..214ef1044f 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -452,7 +452,7 @@ func CDCFlowWorkflowWithConfig( } } - var canceled, normDone bool + var canceled bool signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) mainLoopSelector := workflow.NewSelector(ctx) mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { @@ -463,13 +463,6 @@ func CDCFlowWorkflowWithConfig( c.ReceiveAsync(&signalVal) state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) }) - if normWaitChan != nil { - mainLoopSelector.AddReceive(normWaitChan, func(c workflow.ReceiveChannel, _ bool) { - var signalVal struct{} - c.ReceiveAsync(&signalVal) - normDone = true - }) - } for { for !canceled && mainLoopSelector.HasPending() { @@ -560,7 +553,7 @@ func CDCFlowWorkflowWithConfig( var syncDone bool var normalizeSignalError error - normDone = normWaitChan == nil + normDone := normWaitChan == nil mainLoopSelector.AddFuture(childSyncFlowFuture, func(f workflow.Future) { syncDone = true @@ -622,11 +615,8 @@ func CDCFlowWorkflowWithConfig( } }) - for !syncDone && !normDone && !canceled && state.ActiveSignal != shared.ShutdownSignal { + for !syncDone && !canceled && state.ActiveSignal != shared.ShutdownSignal { mainLoopSelector.Select(ctx) - if childNormalizeFlowFuture.IsReady() { - normDone = true - } } if canceled { break @@ -637,10 +627,12 @@ func CDCFlowWorkflowWithConfig( state.CurrentFlowStatus = protos.FlowStatus_STATUS_TERMINATED return state, nil } - if normalizeSignalError != nil { return state, normalizeSignalError } + if !normDone { + normWaitChan.Receive(ctx, nil) + } } finishNormalize() diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 4f06707c19..ea8b574aa8 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -76,15 +76,13 @@ func NormalizeFlowWorkflow(ctx workflow.Context, if !peerdbenv.PeerDBEnableParallelSyncNormalize() { parent := workflow.GetInfo(ctx).ParentWorkflowExecution - if parent != nil { - workflow.SignalExternalWorkflow( - ctx, - parent.ID, - parent.RunID, - "SyncDone", - struct{}{}, - ) - } + workflow.SignalExternalWorkflow( + ctx, + parent.ID, + parent.RunID, + "SyncDone", + struct{}{}, + ) } }