diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 16d5c371f3..c9825d4e80 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -141,7 +141,7 @@ func GetChildWorkflowID( type CDCFlowWorkflowResult = CDCFlowWorkflowState const ( - maxSyncsPerCdcFlow = 60 + maxSyncsPerCdcFlow = 32 ) func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context, @@ -205,6 +205,10 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) + if w.syncFlowFuture != nil { + _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) + } + // finished processing, wipe it state.FlowConfigUpdate = nil } @@ -300,7 +304,7 @@ func CDCFlowWorkflow( for state.ActiveSignal == model.PauseSignal { // only place we block on receive, so signal processing is immediate - for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil { + for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil { w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) selector.Select(ctx) } @@ -595,13 +599,21 @@ func CDCFlowWorkflow( } if restart { + if state.ActiveSignal == model.PauseSignal { + finished = true + } + for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) { mainLoopSelector.Select(ctx) } + if err := ctx.Err(); err != nil { w.logger.Info("mirror canceled", slog.Any("error", err)) - return state, err + return nil, err } + + // important to control the size of inputs. + state.TruncateProgress(w.logger) return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 6d2c9def50..2ce225f260 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -14,7 +14,7 @@ import ( ) const ( - maxSyncsPerSyncFlow = 72 + maxSyncsPerSyncFlow = 64 ) func SyncFlowWorkflow(