Skip to content

Commit

Permalink
fix state wipe
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 5, 2024
1 parent ba6feaa commit 0b27d05
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
mirrorNameSearch map[string]interface{},
) error {
flowConfigUpdate := state.FlowConfigUpdate

if flowConfigUpdate != nil {
w.logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate))
if len(flowConfigUpdate.AdditionalTables) == 0 {
return nil
}
Expand All @@ -118,6 +120,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
}
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT

w.logger.Info("altering publication for additional tables")
alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
Expand All @@ -130,6 +133,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
return err
}

w.logger.Info("additional tables added to publication")
additionalTablesUUID := GetUUID(ctx)
childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID)
additionalTablesCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
Expand Down Expand Up @@ -163,9 +167,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping)

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

// finished processing, wipe it
state.FlowConfigUpdate = nil
w.logger.Info("additional tables added to sync flow")
}
return nil
}
Expand Down Expand Up @@ -268,6 +270,9 @@ func CDCFlowWorkflow(
if err != nil {
return state, err
}
w.logger.Info("wiping flow state after state update processing")
// finished processing, wipe it
state.FlowConfigUpdate = nil
state.ActiveSignal = model.NoopSignal
}
}
Expand Down

0 comments on commit 0b27d05

Please sign in to comment.