Skip to content

Commit

Permalink
add sync-stop support on sync_flow end
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 23, 2024
1 parent 6784832 commit e655f5e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
4 changes: 2 additions & 2 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
Name: "cdc-dynamic-properties",
}

var SyncSignal = TypedSignal[bool]{
Name: "sync",
var SyncStopSignal = TypedSignal[struct{}]{
Name: "sync-stop",
}

var SyncErrorSignal = TypedSignal[string]{
Expand Down
8 changes: 4 additions & 4 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func CDCFlowWorkflowWithConfig(
flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger)
if state.ActiveSignal == model.PauseSignal {
model.SyncSignal.SignalChildWorkflow(ctx, syncFlowFuture, true)
model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{})
}
})

Expand Down Expand Up @@ -466,8 +466,8 @@ func CDCFlowWorkflowWithConfig(

if !peerdbenv.PeerDBEnableParallelSyncNormalize() {
normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx)
normDoneChan.AddToSelector(mainLoopSelector, func(_ struct{}, _ bool) {
model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{})
normDoneChan.AddToSelector(mainLoopSelector, func(x struct{}, _ bool) {
model.NormalizeDoneSignal.SignalChildWorkflow(ctx, syncFlowFuture, x)
})
}

Expand Down Expand Up @@ -500,7 +500,7 @@ func CDCFlowWorkflowWithConfig(
mainLoopSelector.Select(ctx)
}
if err := ctx.Err(); err != nil {
model.SyncSignal.SignalChildWorkflow(ctx, syncFlowFuture, true)
model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{})
finishSyncNormalize()
return state, err
}
Expand Down
12 changes: 10 additions & 2 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,19 @@ func SyncFlowWorkflow(
waitChan = model.NormalizeDoneSignal.GetSignalChannel(ctx)
}

var stopLoop bool
var stop bool
currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

selector := workflow.NewNamedSelector(ctx, "Sync Loop")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})

for !stopLoop && ctx.Err() == nil {
stopChan := model.SyncStopSignal.GetSignalChannel(ctx)
stopChan.AddToSelector(selector, func(_ struct{}, _ bool) {
stop = true
})

for !stop && ctx.Err() == nil {
var syncDone, syncErr bool
mustWait := waitChan.Chan != nil

Expand Down Expand Up @@ -214,5 +219,8 @@ func SyncFlowWorkflow(
logger.Info("sync canceled: %v", err)
return err
}
if stop {
return nil
}
return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options)
}

0 comments on commit e655f5e

Please sign in to comment.