Skip to content

Commit

Permalink
PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 19, 2024
1 parent f2a5b7f commit 946565f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
5 changes: 5 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,8 @@ func PeerDBCatalogDatabase() string {
func PeerDBEnableWALHeartbeat() bool {
return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false)
}

// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}
19 changes: 16 additions & 3 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -427,6 +428,11 @@ func CDCFlowWorkflowWithConfig(
normalizeFlowOptions,
)

var normWaitChan workflow.ReceiveChannel
if peerdbenv.PeerDBEnableParallelSyncNormalize() {
normWaitChan = workflow.GetSignalChannel(ctx, "SyncDone")
}

finishNormalize := func() {
childNormalizeFlowFuture.SignalChildWorkflow(ctx, "Sync", model.NormalizeSignal{Done: true})
var childNormalizeFlowRes *model.NormalizeFlowResponse
Expand All @@ -443,14 +449,19 @@ func CDCFlowWorkflowWithConfig(
}
}

var canceled bool
var canceled, normDone bool
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
mainLoopSelector := workflow.NewSelector(ctx)
mainLoopSelector.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) {
var signalVal shared.CDCFlowSignal
c.ReceiveAsync(&signalVal)
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
})
mainLoopSelector.AddReceive(normWaitChan, func(c workflow.ReceiveChannel, _ bool) {
var signalVal interface{}
c.ReceiveAsync(&signalVal)
normDone = true
})
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})
Expand Down Expand Up @@ -544,6 +555,7 @@ func CDCFlowWorkflowWithConfig(

var syncDone bool
var normalizeSignalError error
normDone = normWaitChan == nil
mainLoopSelector.AddFuture(childSyncFlowFuture, func(f workflow.Future) {
syncDone = true

Expand Down Expand Up @@ -599,12 +611,13 @@ func CDCFlowWorkflowWithConfig(
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
})

normalizeSignalError = signalFuture.Get(ctx, nil)
} else {
normDone = true
}
})

for !syncDone && !canceled && state.ActiveSignal != shared.ShutdownSignal {
for !syncDone && !normDone && !canceled && state.ActiveSignal != shared.ShutdownSignal {
mainLoopSelector.Select(ctx)
}
if canceled {
Expand Down
12 changes: 12 additions & 0 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"go.temporal.io/sdk/workflow"
)

Expand Down Expand Up @@ -71,6 +72,17 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
} else if normalizeResponse != nil {
results = append(results, *normalizeResponse)
}

if peerdbenv.PeerDBEnableParallelSyncNormalize() {
parent := workflow.GetInfo(ctx).ParentWorkflowExecution
workflow.SignalExternalWorkflow(
ctx,
parent.ID,
parent.RunID,
"SyncDone",
nil,
)
}
}

return &model.NormalizeFlowResponse{
Expand Down

0 comments on commit 946565f

Please sign in to comment.