From 2ea6dbd8a5b1a419d1ff21c55a35fd0a544b747b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 6 Jan 2024 15:50:21 +0000 Subject: [PATCH] Revert "test reverting WaitForCancellation: true" This reverts commit ae100424874c76be9a9aa9cf2f7fbfa2e04ea931. --- flow/workflows/cdc_flow.go | 14 +++++++++----- flow/workflows/sync_flow.go | 3 +++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2b6e78aee3..35a2f2c613 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -216,7 +216,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) @@ -242,8 +243,9 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, - SearchAttributes: mirrorNameSearch, + TaskQueue: taskQueue, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) @@ -392,7 +394,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping @@ -464,7 +467,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 09849b1752..0b82ca4c22 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -41,6 +41,7 @@ func (s *SyncFlowExecution) executeSyncFlow( syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, + WaitForCancellation: true, }) // execute GetLastSyncedID on destination peer @@ -65,6 +66,7 @@ func (s *SyncFlowExecution) executeSyncFlow( startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: 30 * time.Second, + WaitForCancellation: true, }) // execute StartFlow on the peers to start the flow @@ -83,6 +85,7 @@ func (s *SyncFlowExecution) executeSyncFlow( replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 30 * time.Minute, + WaitForCancellation: true, }) replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ FlowConnectionConfigs: config,