Skip to content

Commit

Permalink
CDC Flow: fix state wipe (#1434)
Browse files Browse the repository at this point in the history
Pause mirror second time onwards was not being respected as the state
was not being wiped if only idle timeout / batch size is edited, because
when additionTables is empty we just return nil before setting state to
nil. This made flowConfigUpdate perenially not nil, which caused the
mirror to never enter the pause loop.

This PR moves the setting of state to nil outside of
processConfigUpdates and also adds logs
  • Loading branch information
Amogh-Bharadwaj authored Mar 5, 2024
1 parent ba6feaa commit 6612e76
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
mirrorNameSearch map[string]interface{},
) error {
flowConfigUpdate := state.FlowConfigUpdate

if flowConfigUpdate != nil {
w.logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate))
if len(flowConfigUpdate.AdditionalTables) == 0 {
return nil
}
Expand All @@ -118,6 +120,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
}
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT

w.logger.Info("altering publication for additional tables")
alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
Expand All @@ -130,6 +133,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
return err
}

w.logger.Info("additional tables added to publication")
additionalTablesUUID := GetUUID(ctx)
childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID)
additionalTablesCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
Expand Down Expand Up @@ -163,9 +167,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping)

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

// finished processing, wipe it
state.FlowConfigUpdate = nil
w.logger.Info("additional tables added to sync flow")
}
return nil
}
Expand Down Expand Up @@ -268,6 +270,9 @@ func CDCFlowWorkflow(
if err != nil {
return state, err
}
w.logger.Info("wiping flow state after state update processing")
// finished processing, wipe it
state.FlowConfigUpdate = nil
state.ActiveSignal = model.NoopSignal
}
}
Expand Down

0 comments on commit 6612e76

Please sign in to comment.