diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index ca238899ac..65254de73a 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -74,3 +74,8 @@ func PeerDBCatalogDatabase() string { func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) } + +// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE +func PeerDBEnableParallelSyncNormalize() bool { + return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index f5532f6698..0f76a67386 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -9,6 +9,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" "go.temporal.io/api/enums/v1" @@ -427,6 +428,11 @@ func CDCFlowWorkflowWithConfig( normalizeFlowOptions, ) + var normWaitChan workflow.ReceiveChannel + if peerdbenv.PeerDBEnableParallelSyncNormalize() { + normWaitChan = workflow.GetSignalChannel(ctx, "SyncDone") + } + finishNormalize := func() { childNormalizeFlowFuture.SignalChildWorkflow(ctx, "Sync", model.NormalizeSignal{Done: true}) var childNormalizeFlowRes *model.NormalizeFlowResponse @@ -443,7 +449,7 @@ func CDCFlowWorkflowWithConfig( } } - var canceled bool + var canceled, normDone bool signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) mainLoopSelector := workflow.NewSelector(ctx) mainLoopSelector.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) { @@ -451,6 +457,11 @@ func CDCFlowWorkflowWithConfig( c.ReceiveAsync(&signalVal) state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) }) + mainLoopSelector.AddReceive(normWaitChan, func(c workflow.ReceiveChannel, _ bool) { + var signalVal interface{} + c.ReceiveAsync(&signalVal) + normDone = true + }) mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) @@ -544,6 +555,7 @@ func CDCFlowWorkflowWithConfig( var syncDone bool var normalizeSignalError error + normDone = normWaitChan == nil mainLoopSelector.AddFuture(childSyncFlowFuture, func(f workflow.Future) { syncDone = true @@ -599,12 +611,13 @@ func CDCFlowWorkflowWithConfig( SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, TableNameSchemaMapping: normalizeTableNameSchemaMapping, }) - normalizeSignalError = signalFuture.Get(ctx, nil) + } else { + normDone = true } }) - for !syncDone && !canceled && state.ActiveSignal != shared.ShutdownSignal { + for !syncDone && !normDone && !canceled && state.ActiveSignal != shared.ShutdownSignal { mainLoopSelector.Select(ctx) } if canceled { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 3ad04b819c..11e07b170d 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -6,6 +6,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" "go.temporal.io/sdk/workflow" ) @@ -71,6 +72,17 @@ func NormalizeFlowWorkflow(ctx workflow.Context, } else if normalizeResponse != nil { results = append(results, *normalizeResponse) } + + if peerdbenv.PeerDBEnableParallelSyncNormalize() { + parent := workflow.GetInfo(ctx).ParentWorkflowExecution + workflow.SignalExternalWorkflow( + ctx, + parent.ID, + parent.RunID, + "SyncDone", + nil, + ) + } } return &model.NormalizeFlowResponse{