diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index afc17927ce..40edabea89 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -45,7 +45,7 @@ type CDCFlowWorkflowState struct { SrcTableIdNameMapping map[uint32]string TableNameSchemaMapping map[string]*protos.TableSchema // flow config update request, set to nil after processed - FlowConfigUpdates []*protos.CDCFlowConfigUpdate + FlowConfigUpdate *protos.CDCFlowConfigUpdate // options passed to all SyncFlows SyncFlowOptions *protos.SyncFlowOptions // initially copied from config, all changes are made here though @@ -76,7 +76,7 @@ func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWo CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, SrcTableIdNameMapping: nil, TableNameSchemaMapping: nil, - FlowConfigUpdates: nil, + FlowConfigUpdate: nil, SyncFlowOptions: nil, TableMappings: tableMappings, } @@ -143,13 +143,14 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, mirrorNameSearch map[string]interface{}, ) error { - for _, flowConfigUpdate := range state.FlowConfigUpdates { + flowConfigUpdate := state.FlowConfigUpdate + if flowConfigUpdate != nil { if len(flowConfigUpdate.AdditionalTables) == 0 { - continue + return nil } if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) { w.logger.Warn("duplicate source/destination tables found in additionalTables") - continue + return nil } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -206,7 +207,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...) state.SyncFlowOptions.TableMappings = state.TableMappings // finished processing, wipe it - state.FlowConfigUpdates = nil + state.FlowConfigUpdate = nil } return nil } @@ -444,9 +445,8 @@ func CDCFlowWorkflowWithConfig( if cdcConfigUpdate.IdleTimeout > 0 { state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } + // do this irrespective of additional tables being present, for auto unpausing + state.FlowConfigUpdate = cdcConfigUpdate w.logger.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), @@ -470,11 +470,13 @@ func CDCFlowWorkflowWithConfig( w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) - if state.ActiveSignal == shared.NoopSignal { + if state.FlowConfigUpdate != nil { err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) if err != nil { return state, err } + // explicitly unpause + state.ActiveSignal = shared.NoopSignal } } diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 1d9c6aa850..4cff1b4538 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -114,6 +114,10 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { } }; + const isNotPaused = + mirrorState.currentFlowState.toString() !== + FlowStatus[FlowStatus.STATUS_PAUSED]; + return (
@@ -178,6 +182,11 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { omitAdditionalTablesMapping={omitAdditionalTablesMapping} /> + {isNotPaused ? ( + + ) : ( + + )}
+ ); };