From 8ba31be8ca0b8df0350ae78e7b44cb5a348ce436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 6 Feb 2024 00:21:09 +0000 Subject: [PATCH] drop_flow: retry until both source/destination succeed (#1201) Dropping replication slot can take some time, activities outlive workflow cancelation, so replication slot is often in use for half a minute after cancelation --- flow/workflows/drop_flow.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index fd73a90285..e9092b7a2b 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -24,15 +24,35 @@ func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error { dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req) var sourceError, destinationError error + var sourceOk, destinationOk bool selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-drop", req.FlowJobName)) - selector.AddFuture(dropSourceFuture, func(f workflow.Future) { + + var dropSource, dropDestination func(f workflow.Future) + dropSource = func(f workflow.Future) { sourceError = f.Get(ctx, nil) - }) - selector.AddFuture(dropDestinationFuture, func(f workflow.Future) { + sourceOk = sourceError == nil + if !sourceOk { + dropSourceFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req) + selector.AddFuture(dropSourceFuture, dropSource) + _ = workflow.Sleep(ctx, time.Second) + } + } + dropDestination = func(f workflow.Future) { destinationError = f.Get(ctx, nil) - }) - selector.Select(ctx) - selector.Select(ctx) - - return errors.Join(sourceError, destinationError) + destinationOk = destinationError == nil + if !destinationOk { + dropDestinationFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req) + selector.AddFuture(dropDestinationFuture, dropDestination) + _ = workflow.Sleep(ctx, time.Second) + } + } + + for { + selector.Select(ctx) + if ctx.Err() != nil { + return errors.Join(sourceError, destinationError) + } else if sourceOk && destinationOk { + return nil + } + } }