diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d81a418320..55751bd096 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -156,6 +156,7 @@ func CDCFlowWorkflowWithConfig( return nil, fmt.Errorf("invalid connection configs") } + ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) w := NewCDCFlowWorkflowExecution(ctx) if limits.TotalSyncFlows == 0 { @@ -170,6 +171,10 @@ func CDCFlowWorkflowWithConfig( return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err) } + mirrorNameSearch := map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + } + // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead @@ -185,10 +190,6 @@ func CDCFlowWorkflowWithConfig( } } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } - // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName) @@ -204,7 +205,6 @@ func CDCFlowWorkflowWithConfig( SearchAttributes: mirrorNameSearch, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) - setupFlowCtx = workflow.WithValue(setupFlowCtx, "flowName", cfg.FlowJobName) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) @@ -232,7 +232,6 @@ func CDCFlowWorkflowWithConfig( SearchAttributes: mirrorNameSearch, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) - snapshotFlowCtx = workflow.WithValue(snapshotFlowCtx, "flowName", cfg.FlowJobName) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) @@ -266,7 +265,6 @@ func CDCFlowWorkflowWithConfig( StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: 1 * time.Hour, }) - renameTablesCtx = workflow.WithValue(renameTablesCtx, "flowName", cfg.FlowJobName) renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { return state, fmt.Errorf("failed to execute rename tables activity: %w", err) @@ -347,9 +345,6 @@ func CDCFlowWorkflowWithConfig( return state, err } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } // execute the sync flow as a child workflow childSyncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, @@ -359,18 +354,17 @@ func CDCFlowWorkflowWithConfig( }, SearchAttributes: mirrorNameSearch, } - ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts) - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) + syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( - ctx, + syncCtx, SyncFlowWorkflow, cfg, syncFlowOptions, ) var childSyncFlowRes *model.SyncResponse - if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil { + if err := childSyncFlowFuture.Get(syncCtx, &childSyncFlowRes); err != nil { w.logger.Error("failed to execute sync flow: ", err) state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else { @@ -383,20 +377,6 @@ func CDCFlowWorkflowWithConfig( w.logger.Info("Total records synced: ", totalRecordsSynced) - normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) - if err != nil { - return state, err - } - - childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: normalizeFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - SearchAttributes: mirrorNameSearch, - } - ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) var tableSchemaDeltas []*protos.TableSchemaDelta = nil if childSyncFlowRes != nil { tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas @@ -415,7 +395,6 @@ func CDCFlowWorkflowWithConfig( getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) - getModifiedSchemaCtx = workflow.WithValue(getModifiedSchemaCtx, "flowName", cfg.FlowJobName) getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, &protos.GetTableSchemaBatchInput{ PeerConnectionConfig: cfg.Source, @@ -432,15 +411,29 @@ func CDCFlowWorkflowWithConfig( } } } - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) + + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) + if err != nil { + return state, err + } + + childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: normalizeFlowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 20, + }, + SearchAttributes: mirrorNameSearch, + } + normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( - ctx, + normCtx, NormalizeFlowWorkflow, cfg, ) var childNormalizeFlowRes *model.NormalizeResponse - if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { + if err := childNormalizeFlowFuture.Get(normCtx, &childNormalizeFlowRes); err != nil { w.logger.Error("failed to execute normalize flow: ", err) state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) } else { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index af14e11b8f..39256eac1a 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -51,7 +51,6 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( HeartbeatTimeout: 5 * time.Minute, }) - // execute StartFlow on the peers to start the flow startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, }