From 63cb1bcbbf0badf5ba0a14284e94a96b7585ac6a Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Mon, 18 Mar 2024 16:37:12 +0530 Subject: [PATCH] DropFlow support for stilgar (#1496) --- flow/cmd/handler.go | 92 +++++++++++++++++++------------------ flow/workflows/qrep_flow.go | 5 +- 2 files changed, 51 insertions(+), 46 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 9655a0bfbb..e2da97139e 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -337,56 +337,58 @@ func (h *FlowRequestHandler) ShutdownFlow( }, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err) } - workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New()) - workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - SearchAttributes: map[string]interface{}{ - shared.MirrorNameSearchAttribute: req.FlowJobName, - }, - } - dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, req) - if err != nil { - slog.Error("unable to start DropFlow workflow", - logs, - slog.Any("error", err)) - return &protos.ShutdownResponse{ - Ok: false, - ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err), - }, fmt.Errorf("unable to start DropFlow workflow: %w", err) - } - - cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() - - errChan := make(chan error, 1) - go func() { - errChan <- dropFlowHandle.Get(cancelCtx, nil) - }() - - select { - case err := <-errChan: - if err != nil { - slog.Error("DropFlow workflow did not execute successfully", - logs, - slog.Any("error", err), - ) - return &protos.ShutdownResponse{ - Ok: false, - ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err), - }, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + if req.SourcePeer.Type == protos.DBType_POSTGRES { + workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New()) + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: h.peerflowTaskQueueID, + SearchAttributes: map[string]interface{}{ + shared.MirrorNameSearchAttribute: req.FlowJobName, + }, } - case <-time.After(5 * time.Minute): - err := h.handleCancelWorkflow(ctx, workflowID, "") + dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, req) if err != nil { - slog.Error("unable to wait for DropFlow workflow to close", + slog.Error("unable to start DropFlow workflow", logs, - slog.Any("error", err), - ) + slog.Any("error", err)) return &protos.ShutdownResponse{ Ok: false, - ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err), - }, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) + ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err), + }, fmt.Errorf("unable to start DropFlow workflow: %w", err) + } + + cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + errChan := make(chan error, 1) + go func() { + errChan <- dropFlowHandle.Get(cancelCtx, nil) + }() + + select { + case err := <-errChan: + if err != nil { + slog.Error("DropFlow workflow did not execute successfully", + logs, + slog.Any("error", err), + ) + return &protos.ShutdownResponse{ + Ok: false, + ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err), + }, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + } + case <-time.After(5 * time.Minute): + err := h.handleCancelWorkflow(ctx, workflowID, "") + if err != nil { + slog.Error("unable to wait for DropFlow workflow to close", + logs, + slog.Any("error", err), + ) + return &protos.ShutdownResponse{ + Ok: false, + ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err), + }, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) + } } } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 0cb560dcd7..0317c467c2 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -517,7 +517,10 @@ func QRepFlowWorkflow( if config.SourcePeer.Type == protos.DBType_SNOWFLAKE && config.WaitBetweenBatchesSeconds > 0 { logger.Info(fmt.Sprintf("copy completed, restarting workflow in %d seconds", config.WaitBetweenBatchesSeconds)) - workflow.Sleep(ctx, time.Duration(config.WaitBetweenBatchesSeconds)*time.Second) + err := workflow.Sleep(ctx, time.Duration(config.WaitBetweenBatchesSeconds)*time.Second) + if err != nil { + return nil + } return workflow.NewContinueAsNewError(ctx, QRepFlowWorkflow, config, state) } logger.Info("initial copy completed for peer flow")