diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 419b11b7e..271319006 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -108,7 +108,9 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte mirrorNameSearch map[string]interface{}, ) error { flowConfigUpdate := state.FlowConfigUpdate + if flowConfigUpdate != nil { + w.logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate)) if len(flowConfigUpdate.AdditionalTables) == 0 { return nil } @@ -118,6 +120,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte } state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT + w.logger.Info("altering publication for additional tables") alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -130,6 +133,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte return err } + w.logger.Info("additional tables added to publication") additionalTablesUUID := GetUUID(ctx) childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID) additionalTablesCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) @@ -163,9 +167,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping) state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) - - // finished processing, wipe it - state.FlowConfigUpdate = nil + w.logger.Info("additional tables added to sync flow") } return nil } @@ -268,6 +270,9 @@ func CDCFlowWorkflow( if err != nil { return state, err } + w.logger.Info("wiping flow state after state update processing") + // finished processing, wipe it + state.FlowConfigUpdate = nil state.ActiveSignal = model.NoopSignal } }