Skip to content

Commit

Permalink
Revert "test reverting WaitForCancellation: true"
Browse files Browse the repository at this point in the history
This reverts commit ae10042.
  • Loading branch information
serprex committed Jan 6, 2024
1 parent c95180e commit 2ea6dbd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
14 changes: 9 additions & 5 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
Expand All @@ -242,8 +243,9 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
TaskQueue: taskQueue,
SearchAttributes: mirrorNameSearch,
TaskQueue: taskQueue,
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
Expand Down Expand Up @@ -392,7 +394,8 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
Expand Down Expand Up @@ -464,7 +467,8 @@ func CDCFlowWorkflowWithConfig(
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 20,
},
SearchAttributes: mirrorNameSearch,
SearchAttributes: mirrorNameSearch,
WaitForCancellation: true,
}
normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts)
childNormalizeFlowFuture := workflow.ExecuteChildWorkflow(
Expand Down
3 changes: 3 additions & 0 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *SyncFlowExecution) executeSyncFlow(

syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
WaitForCancellation: true,
})

// execute GetLastSyncedID on destination peer
Expand All @@ -65,6 +66,7 @@ func (s *SyncFlowExecution) executeSyncFlow(
startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 72 * time.Hour,
HeartbeatTimeout: 30 * time.Second,
WaitForCancellation: true,
})

// execute StartFlow on the peers to start the flow
Expand All @@ -83,6 +85,7 @@ func (s *SyncFlowExecution) executeSyncFlow(

replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
WaitForCancellation: true,
})
replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{
FlowConnectionConfigs: config,
Expand Down

0 comments on commit 2ea6dbd

Please sign in to comment.