-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Runs SendWalHeartbeat in parallel #675
Conversation
@@ -268,6 +256,13 @@ func CDCFlowWorkflowWithConfig( | |||
state.Progress = append(state.Progress, "executed setup flow and snapshot flow") | |||
} | |||
|
|||
heartbeatCancelCtx, cancelHeartbeat := workflow.WithCancel(ctx) | |||
walHeartbeatCtx := workflow.WithActivityOptions(heartbeatCancelCtx, workflow.ActivityOptions{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make start to close timeout much longer like 1 week
flow/workflows/cdc_flow.go
Outdated
StartToCloseTimeout: 5 * time.Minute, | ||
}) | ||
workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg) | ||
//walHeartbeatFuture.Get(ctx, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
flow/activities/flowable.go
Outdated
err = srcConn.SendWALHeartbeat() | ||
if err != nil { | ||
return fmt.Errorf("failed to send WAL heartbeat: %w", err) | ||
ticker := time.NewTicker(10 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it once every 10 mins and add a log with flow job name in it
6179b75
to
af2c67c
Compare
af2c67c
to
eed05ff
Compare
SendWalHeartbeat now runs every 10 seconds in parallel during the PeerFlow phase of CDC (after setup and snapshot)
cancelActivity is called where previously the .Get() on the activity was called in
cdc_flow.go