diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 55751bd096..c0e0ad8559 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -37,7 +37,7 @@ type CDCFlowWorkflowState struct { // Accumulates status for sync flows spawned. SyncFlowStatuses []*model.SyncResponse // Accumulates status for sync flows spawned. - NormalizeFlowStatuses []*model.NormalizeResponse + NormalizeFlowStatuses []model.NormalizeResponse // Current signalled state of the peer flow. ActiveSignal shared.CDCFlowSignal // SetupComplete indicates whether the peer flow setup has completed. @@ -300,6 +300,30 @@ func CDCFlowWorkflowWithConfig( currentSyncFlowNum := 0 totalRecordsSynced := 0 + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) + if err != nil { + return state, err + } + + childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: normalizeFlowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 20, + }, + SearchAttributes: mirrorNameSearch, + } + normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) + childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( + normCtx, + NormalizeFlowWorkflow, + cfg, + ) + var normExecution workflow.Execution + if err := childNormalizeFlowFuture.GetChildWorkflowExecution().Get(ctx, &normExecution); err != nil { + return state, fmt.Errorf("normalize workflow failed to start: %w", err) + } + for { // check and act on signals before a fresh flow starts. w.receiveAndHandleSignalAsync(ctx, state) @@ -320,6 +344,7 @@ func CDCFlowWorkflowWithConfig( } // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true) w.logger.Info("peer flow has been shutdown") return state, nil } @@ -342,6 +367,7 @@ func CDCFlowWorkflowWithConfig( syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName) if err != nil { + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true) return state, err } @@ -376,6 +402,7 @@ func CDCFlowWorkflowWithConfig( } w.logger.Info("Total records synced: ", totalRecordsSynced) + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", false) var tableSchemaDeltas []*protos.TableSchemaDelta = nil if childSyncFlowRes != nil { @@ -412,36 +439,18 @@ func CDCFlowWorkflowWithConfig( } } - normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) - if err != nil { - return state, err - } - - childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: normalizeFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - SearchAttributes: mirrorNameSearch, - } - normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) - childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( - normCtx, - NormalizeFlowWorkflow, - cfg, - ) - - var childNormalizeFlowRes *model.NormalizeResponse - if err := childNormalizeFlowFuture.Get(normCtx, &childNormalizeFlowRes); err != nil { - w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) - } else { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) - } batchSizeSelector.Select(ctx) } + workflow.SignalExternalWorkflow(ctx, normExecution.ID, normExecution.RunID, "Sync", true) + var childNormalizeFlowRes []model.NormalizeResponse + if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { + w.logger.Error("failed to execute normalize flow: ", err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) + } else { + state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes...) + } + state.TruncateProgress(w.logger) return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 39256eac1a..04503651b4 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -31,7 +31,7 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) func NormalizeFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, -) (*model.NormalizeResponse, error) { +) ([]model.NormalizeResponse, error) { s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ CDCFlowName: config.FlowJobName, Progress: []string{}, @@ -43,7 +43,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context, func (s *NormalizeFlowExecution) executeNormalizeFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, -) (*model.NormalizeResponse, error) { +) ([]model.NormalizeResponse, error) { s.logger.Info("executing normalize flow - ", s.CDCFlowName) normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -51,15 +51,34 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( HeartbeatTimeout: 5 * time.Minute, }) - startNormalizeInput := &protos.StartNormalizeInput{ - FlowConnectionConfigs: config, - } - fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) + result := make([]model.NormalizeResponse, 0) + syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync") + + stopLoop := false + for stopLoop { + var stopLoopVal bool + var anyFalse bool + syncChan.Receive(normalizeFlowCtx, &stopLoopVal) + stopLoop = stopLoop || stopLoopVal + anyFalse = anyFalse || !stopLoopVal + for syncChan.ReceiveAsync(&stopLoopVal) { + stopLoop = stopLoop || stopLoopVal + anyFalse = anyFalse || !stopLoopVal + } + + if anyFalse { + startNormalizeInput := &protos.StartNormalizeInput{ + FlowConnectionConfigs: config, + } + fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput) - var normalizeResponse *model.NormalizeResponse - if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { - return nil, fmt.Errorf("failed to flow: %w", err) + var normalizeResponse *model.NormalizeResponse + if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil { + return result, fmt.Errorf("failed to flow: %w", err) + } + result = append(result, *normalizeResponse) + } } - return normalizeResponse, nil + return result, nil }