diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index fd73a90285..e9092b7a2b 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -24,15 +24,35 @@ func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error { dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req) var sourceError, destinationError error + var sourceOk, destinationOk bool selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-drop", req.FlowJobName)) - selector.AddFuture(dropSourceFuture, func(f workflow.Future) { + + var dropSource, dropDestination func(f workflow.Future) + dropSource = func(f workflow.Future) { sourceError = f.Get(ctx, nil) - }) - selector.AddFuture(dropDestinationFuture, func(f workflow.Future) { + sourceOk = sourceError == nil + if !sourceOk { + dropSourceFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowSource, req) + selector.AddFuture(dropSourceFuture, dropSource) + _ = workflow.Sleep(ctx, time.Second) + } + } + dropDestination = func(f workflow.Future) { destinationError = f.Get(ctx, nil) - }) - selector.Select(ctx) - selector.Select(ctx) - - return errors.Join(sourceError, destinationError) + destinationOk = destinationError == nil + if !destinationOk { + dropDestinationFuture = workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, req) + selector.AddFuture(dropDestinationFuture, dropDestination) + _ = workflow.Sleep(ctx, time.Second) + } + } + + for { + selector.Select(ctx) + if ctx.Err() != nil { + return errors.Join(sourceError, destinationError) + } else if sourceOk && destinationOk { + return nil + } + } }