Skip to content

Commit

Permalink
fix sync_flow
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 23, 2024
1 parent d56fb3e commit d16da9c
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func SyncFlowWorkflow(
})
sessionSelector.Select(ctx)
if sessionError != nil {
if ctx.Err() != nil {
return ctx.Err()
}
logger.Error("error starting session, retry in 1 second", slog.Any("error", sessionError))
_ = workflow.Sleep(ctx, time.Second)
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options)
}

Expand All @@ -87,37 +92,25 @@ func SyncFlowWorkflow(
})

for !stopLoop {
selector.Select(ctx)
for !canceled && selector.HasPending() {
selector.Select(ctx)
}
if canceled {
logger.Info("sync canceled")
if ctx.Err() != nil {
canceled = true
break
}

// check if total sync flows have been completed
// since this happens immediately after we check for signals, the case of a signal being missed
// due to a new workflow starting is vanishingly low, but possible
if currentSyncFlowNum == maxSyncsPerFlow {
return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options)
}
var syncDone, syncErr bool
mustWait := waitChan.Chan != nil

// execute the sync flow
currentSyncFlowNum += 1
logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum))

// execute the sync flow
syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: time.Minute,
WaitForCancellation: true,
})

logger.Info("executing sync flow")
syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionInfo.SessionID)

var syncDone, syncErr bool
var normalizeSignalError error
mustWait := waitChan.Chan != nil
selector.AddFuture(syncFlowFuture, func(f workflow.Future) {
syncDone = true

Expand Down Expand Up @@ -193,27 +186,40 @@ func SyncFlowWorkflow(
TableNameSchemaMapping: options.TableNameSchemaMapping,
},
)
normalizeSignalError = signalFuture.Get(ctx, nil)
err = signalFuture.Get(ctx, nil)
if err != nil {
logger.Error("failed to trigger normalize, next sync", slog.Any("error", err))
mustWait = false
}
} else {
mustWait = false
}
})

for !syncDone && !canceled {
for !canceled && !syncDone && !selector.HasPending() {
selector.Select(ctx)
}
if canceled {
break
}
if syncErr {
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options)
}
if normalizeSignalError != nil {
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options)
}

restart := currentSyncFlowNum == maxSyncsPerFlow || syncErr
if mustWait {
waitChan.Receive(ctx)
if restart {
// must flush selector for signals received while waiting
for !canceled && !selector.HasPending() {
selector.Select(ctx)
}
break
}
} else if restart {
break
}
}
if canceled {
logger.Info("sync canceled")
return ctx.Err()
}
return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options)
}

0 comments on commit d16da9c

Please sign in to comment.