Skip to content

Commit

Permalink
fixed handling of selector
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 29, 2024
1 parent 2cc4d7a commit c16ae28
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
}

// finished processing, wipe it
state.FlowConfigUpdate = nil
}
Expand Down Expand Up @@ -299,7 +303,7 @@ func CDCFlowWorkflow(

for state.ActiveSignal == model.PauseSignal {
// only place we block on receive, so signal processing is immediate
for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil {
for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil {
w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
selector.Select(ctx)
}
Expand Down

0 comments on commit c16ae28

Please sign in to comment.