Skip to content

Commit

Permalink
Merge branch 'main' into snapshot-status-dynadd
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Feb 29, 2024
2 parents 0662890 + f9c883e commit 7b1aaca
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
18 changes: 15 additions & 3 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func GetChildWorkflowID(
type CDCFlowWorkflowResult = CDCFlowWorkflowState

const (
maxSyncsPerCdcFlow = 60
maxSyncsPerCdcFlow = 32
)

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context,
Expand Down Expand Up @@ -205,6 +205,10 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
}

// finished processing, wipe it
state.FlowConfigUpdate = nil
}
Expand Down Expand Up @@ -300,7 +304,7 @@ func CDCFlowWorkflow(

for state.ActiveSignal == model.PauseSignal {
// only place we block on receive, so signal processing is immediate
for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil {
for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil {
w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
selector.Select(ctx)
}
Expand Down Expand Up @@ -595,13 +599,21 @@ func CDCFlowWorkflow(
}

if restart {
if state.ActiveSignal == model.PauseSignal {
finished = true
}

for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) {
mainLoopSelector.Select(ctx)
}

if err := ctx.Err(); err != nil {
w.logger.Info("mirror canceled", slog.Any("error", err))
return state, err
return nil, err
}

// important to control the size of inputs.
state.TruncateProgress(w.logger)
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
maxSyncsPerSyncFlow = 72
maxSyncsPerSyncFlow = 64
)

func SyncFlowWorkflow(
Expand Down

0 comments on commit 7b1aaca

Please sign in to comment.