Skip to content

Commit

Permalink
drop_flow: retry until both source/destination succeed (#1201)
Browse files Browse the repository at this point in the history
Dropping replication slot can take some time,
activities outlive workflow cancelation,
so replication slot is often in use for half a minute after cancelation
  • Loading branch information
serprex authored Feb 6, 2024
1 parent e49b4c3 commit 8ba31be
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,35 @@ func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error {
dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req)

var sourceError, destinationError error
var sourceOk, destinationOk bool
selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-drop", req.FlowJobName))
selector.AddFuture(dropSourceFuture, func(f workflow.Future) {

var dropSource, dropDestination func(f workflow.Future)
dropSource = func(f workflow.Future) {
sourceError = f.Get(ctx, nil)
})
selector.AddFuture(dropDestinationFuture, func(f workflow.Future) {
sourceOk = sourceError == nil
if !sourceOk {
dropSourceFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req)
selector.AddFuture(dropSourceFuture, dropSource)
_ = workflow.Sleep(ctx, time.Second)
}
}
dropDestination = func(f workflow.Future) {
destinationError = f.Get(ctx, nil)
})
selector.Select(ctx)
selector.Select(ctx)

return errors.Join(sourceError, destinationError)
destinationOk = destinationError == nil
if !destinationOk {
dropDestinationFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req)
selector.AddFuture(dropDestinationFuture, dropDestination)
_ = workflow.Sleep(ctx, time.Second)
}
}

for {
selector.Select(ctx)
if ctx.Err() != nil {
return errors.Join(sourceError, destinationError)
} else if sourceOk && destinationOk {
return nil
}
}
}

0 comments on commit 8ba31be

Please sign in to comment.