From ae100424874c76be9a9aa9cf2f7fbfa2e04ea931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 6 Jan 2024 15:13:41 +0000 Subject: [PATCH] test reverting WaitForCancellation: true --- flow/workflows/cdc_flow.go | 14 +++++--------- flow/workflows/sync_flow.go | 3 --- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 35a2f2c613..2b6e78aee3 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -216,8 +216,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + SearchAttributes: mirrorNameSearch, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) @@ -243,9 +242,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + TaskQueue: taskQueue, + SearchAttributes: mirrorNameSearch, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) @@ -394,8 +392,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + SearchAttributes: mirrorNameSearch, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping @@ -467,8 +464,7 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, - WaitForCancellation: true, + SearchAttributes: mirrorNameSearch, } normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 0b82ca4c22..09849b1752 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -41,7 +41,6 @@ func (s *SyncFlowExecution) executeSyncFlow( syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, - WaitForCancellation: true, }) // execute GetLastSyncedID on destination peer @@ -66,7 +65,6 @@ func (s *SyncFlowExecution) executeSyncFlow( startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: 30 * time.Second, - WaitForCancellation: true, }) // execute StartFlow on the peers to start the flow @@ -85,7 +83,6 @@ func (s *SyncFlowExecution) executeSyncFlow( replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 30 * time.Minute, - WaitForCancellation: true, }) replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ FlowConnectionConfigs: config,