diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2b6e78aee3..35a2f2c613 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -216,7 +216,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) @@ -242,8 +243,9 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, - SearchAttributes: mirrorNameSearch, + TaskQueue: taskQueue, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) @@ -392,7 +394,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping @@ -464,7 +467,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 09849b1752..0b82ca4c22 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -41,6 +41,7 @@ func (s *SyncFlowExecution) executeSyncFlow( syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, + WaitForCancellation: true, }) // execute GetLastSyncedID on destination peer @@ -65,6 +66,7 @@ func (s *SyncFlowExecution) executeSyncFlow( startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: 30 * time.Second, + WaitForCancellation: true, }) // execute StartFlow on the peers to start the flow @@ -83,6 +85,7 @@ func (s *SyncFlowExecution) executeSyncFlow( replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 30 * time.Minute, + WaitForCancellation: true, }) replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ FlowConnectionConfigs: config,