Skip to content

Commit

Permalink
idea: after sync flow error, normalize must still be waited upon
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 27, 2023
1 parent 07c96bc commit e58a25e
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,17 @@ func CDCFlowWorkflowWithConfig(
return state, fmt.Errorf("normalize workflow failed to start: %w", err)
}

finishNormalize := func() {
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
var childNormalizeFlowRes []model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes...)
}
}

for {
// check and act on signals before a fresh flow starts.
w.receiveAndHandleSignalAsync(ctx, state)
Expand All @@ -347,7 +358,7 @@ func CDCFlowWorkflowWithConfig(
}
// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
finishNormalize()
w.logger.Info("peer flow has been shutdown")
return state, nil
}
Expand All @@ -370,7 +381,7 @@ func CDCFlowWorkflowWithConfig(

syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName)
if err != nil {
workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
finishNormalize()
return state, err
}

Expand Down Expand Up @@ -445,15 +456,7 @@ func CDCFlowWorkflowWithConfig(
batchSizeSelector.Select(ctx)
}

workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true)
var childNormalizeFlowRes []model.NormalizeResponse
if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes...)
}

finishNormalize()
state.TruncateProgress(w.logger)
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}

0 comments on commit e58a25e

Please sign in to comment.