diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 6d5d68f8fd..31db45788d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -1255,16 +1255,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { sentUpdate = true return true }) - }, 28*time.Second) - env.RegisterDelayedCallback(func() { - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool { - if !sentUpdate { - return false - } - e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal) - s.t.Log("Sent resume signal") - return true - }) }, 56*time.Second) go func() { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0fa1110a85..6c1c76cb28 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct { RelationMessageMapping model.RelationMessageMapping CurrentFlowStatus protos.FlowStatus // flow config update request, set to nil after processed - FlowConfigUpdates []*protos.CDCFlowConfigUpdate + FlowConfigUpdate *protos.CDCFlowConfigUpdate // options passed to all SyncFlows SyncFlowOptions *protos.SyncFlowOptions } @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow SyncFlowErrors: nil, NormalizeFlowErrors: nil, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, - FlowConfigUpdates: nil, + FlowConfigUpdate: nil, SyncFlowOptions: &protos.SyncFlowOptions{ BatchSize: cfg.MaxBatchSize, IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, @@ -144,17 +144,18 @@ const ( maxSyncsPerCdcFlow = 60 ) -func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context, +func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context, cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, mirrorNameSearch map[string]interface{}, ) error { - for _, flowConfigUpdate := range state.FlowConfigUpdates { + flowConfigUpdate := state.FlowConfigUpdate + if flowConfigUpdate != nil { if len(flowConfigUpdate.AdditionalTables) == 0 { - continue + return nil } if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) { w.logger.Warn("duplicate source/destination tables found in additionalTables") - continue + return nil } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -200,12 +201,16 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping) maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping) - maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping) state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) + + if w.syncFlowFuture != nil { + _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) + } + + // finished processing, wipe it + state.FlowConfigUpdate = nil } - // finished processing, wipe it - state.FlowConfigUpdates = nil return nil } @@ -223,9 +228,8 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener( if cdcConfigUpdate.IdleTimeout > 0 { state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } + // do this irrespective of additional tables being present, for auto unpausing + state.FlowConfigUpdate = cdcConfigUpdate if w.syncFlowFuture != nil { _ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil) @@ -299,7 +303,7 @@ func CDCFlowWorkflow( for state.ActiveSignal == model.PauseSignal { // only place we block on receive, so signal processing is immediate - for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil { + for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil { w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) selector.Select(ctx) } @@ -307,9 +311,12 @@ func CDCFlowWorkflow( return state, err } - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) - if err != nil { - return state, err + if state.FlowConfigUpdate != nil { + err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch) + if err != nil { + return state, err + } + state.ActiveSignal = model.NoopSignal } } diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 1d9c6aa850..4cff1b4538 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -114,6 +114,10 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { } }; + const isNotPaused = + mirrorState.currentFlowState.toString() !== + FlowStatus[FlowStatus.STATUS_PAUSED]; + return (
@@ -178,6 +182,11 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { omitAdditionalTablesMapping={omitAdditionalTablesMapping} /> + {isNotPaused ? ( + + ) : ( + + )}
+ ); };