From d16da9c24c1ac5ad74d6193aa1dc601f6d44e567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 23 Feb 2024 15:52:48 +0000 Subject: [PATCH] fix sync_flow --- flow/workflows/sync_flow.go | 58 ++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 99961c42ba..341051d15a 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -69,6 +69,11 @@ func SyncFlowWorkflow( }) sessionSelector.Select(ctx) if sessionError != nil { + if ctx.Err() != nil { + return ctx.Err() + } + logger.Error("error starting session, retry in 1 second", slog.Any("error", sessionError)) + _ = workflow.Sleep(ctx, time.Second) return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options) } @@ -87,37 +92,25 @@ func SyncFlowWorkflow( }) for !stopLoop { - selector.Select(ctx) - for !canceled && selector.HasPending() { - selector.Select(ctx) - } - if canceled { - logger.Info("sync canceled") + if ctx.Err() != nil { + canceled = true break } - // check if total sync flows have been completed - // since this happens immediately after we check for signals, the case of a signal being missed - // due to a new workflow starting is vanishingly low, but possible - if currentSyncFlowNum == maxSyncsPerFlow { - return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options) - } + var syncDone, syncErr bool + mustWait := waitChan.Chan != nil + + // execute the sync flow currentSyncFlowNum += 1 logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) - // execute the sync flow syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: time.Minute, WaitForCancellation: true, }) - logger.Info("executing sync flow") syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, config, options, sessionInfo.SessionID) - - var syncDone, syncErr bool - var normalizeSignalError error - mustWait := waitChan.Chan != nil selector.AddFuture(syncFlowFuture, func(f workflow.Future) { syncDone = true @@ -193,27 +186,40 @@ func SyncFlowWorkflow( TableNameSchemaMapping: options.TableNameSchemaMapping, }, ) - normalizeSignalError = signalFuture.Get(ctx, nil) + err = signalFuture.Get(ctx, nil) + if err != nil { + logger.Error("failed to trigger normalize, next sync", slog.Any("error", err)) + mustWait = false + } } else { mustWait = false } }) - for !syncDone && !canceled { + for !canceled && !syncDone && !selector.HasPending() { selector.Select(ctx) } if canceled { break } - if syncErr { - return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options) - } - if normalizeSignalError != nil { - return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options) - } + + restart := currentSyncFlowNum == maxSyncsPerFlow || syncErr if mustWait { waitChan.Receive(ctx) + if restart { + // must flush selector for signals received while waiting + for !canceled && !selector.HasPending() { + selector.Select(ctx) + } + break + } + } else if restart { + break } } + if canceled { + logger.Info("sync canceled") + return ctx.Err() + } return workflow.NewContinueAsNewError(ctx, SyncFlowWorkflow, config, options) }