Skip to content

Commit

Permalink
Add error handling to session setup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 8, 2024
1 parent 852cbe0 commit 5e47225
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,19 +378,34 @@ func CDCFlowWorkflowWithConfig(
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})
workflow.ExecuteActivity(
fMaintain := workflow.ExecuteActivity(
syncCtx,
flowable.MaintainPull,
cfg,
sessionInfo.SessionID,
)
fWait := workflow.ExecuteActivity(
fSessionSetup := workflow.ExecuteActivity(
syncCtx,
flowable.WaitForSourceConnector,
sessionInfo.SessionID,
)
if err := fWait.Get(syncCtx, nil); err != nil {
return nil, err

var sessionError error
sessionSelector := workflow.NewNamedSelector(ctx, "Session Setup")
sessionSelector.AddFuture(fMaintain, func(f workflow.Future) {
// MaintainPull should never exit without an error before this point
sessionError = f.Get(syncCtx, nil)
})
sessionSelector.AddFuture(fSessionSetup, func(f workflow.Future) {
// Happy path is waiting for this to return without error
sessionError = f.Get(syncCtx, nil)
})
sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
sessionError = ctx.Err()
})
sessionSelector.Select(ctx)
if sessionError != nil {
return nil, sessionError
}

state.SyncFlowOptions = &protos.SyncFlowOptions{
Expand Down Expand Up @@ -452,7 +467,7 @@ func CDCFlowWorkflowWithConfig(

var canceled bool
flowSignalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
mainLoopSelector := workflow.NewSelector(ctx)
mainLoopSelector := workflow.NewNamedSelector(ctx, "Main Loop")
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})
Expand Down

0 comments on commit 5e47225

Please sign in to comment.