From baaf3b26eec54b60a4161baba48c1616df28862c Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 4 Oct 2023 13:23:51 -0400 Subject: [PATCH] Support `DROP MIRROR` for Query Replication fixes: https://github.com/PeerDB-io/peerdb/issues/476 --- flow/cmd/handler.go | 26 +++++++++++++++++--------- flow/workflows/qrep_flow.go | 14 +++++++++----- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 510d92b635..1dec54caed 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -110,7 +110,9 @@ func (h *FlowRequestHandler) CreateQRepFlow( } func (h *FlowRequestHandler) ShutdownFlow( - ctx context.Context, req *protos.ShutdownRequest) (*protos.ShutdownResponse, error) { + ctx context.Context, + req *protos.ShutdownRequest, +) (*protos.ShutdownResponse, error) { err := h.temporalClient.SignalWorkflow( ctx, req.WorkflowId, @@ -153,9 +155,9 @@ func (h *FlowRequestHandler) ShutdownFlow( func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error { expBackoff := backoff.NewExponentialBackOff() - expBackoff.InitialInterval = 5 * time.Second - expBackoff.MaxInterval = 30 * time.Second - expBackoff.MaxElapsedTime = 5 * time.Minute + expBackoff.InitialInterval = 3 * time.Second + expBackoff.MaxInterval = 10 * time.Second + expBackoff.MaxElapsedTime = 1 * time.Minute // empty will terminate the latest run runID := "" @@ -176,14 +178,20 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI err := backoff.Retry(operation, expBackoff) if err != nil { - // terminate workflow if it is still running - reason := "PeerFlow workflow did not close in time" - err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason) - if err != nil { + return h.handleWorkflowNotClosed(ctx, workflowID, runID) + } + + return nil +} + +func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workflowID, runID string) error { + if err := h.temporalClient.CancelWorkflow(ctx, workflowID, runID); err != nil { + log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error()) + terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID) + if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err) } } - return nil } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index bceb1fa7f7..0141e4f57f 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -6,6 +6,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" @@ -231,14 +232,17 @@ func QRepFlowWorkflow( // register a signal handler to terminate the workflow terminateWorkflow := false - signalChan := workflow.GetSignalChannel(ctx, "terminate") + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) s := workflow.NewSelector(ctx) s.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) { - var signal string - c.Receive(ctx, &signal) - logger.Info("Received signal to terminate workflow", "Signal", signal) - terminateWorkflow = true + var signalVal shared.CDCFlowSignal + c.Receive(ctx, &signalVal) + logger.Info("received signal", "signal", signalVal) + if signalVal == shared.ShutdownSignal { + logger.Info("received shutdown signal") + terminateWorkflow = true + } }) // register a query to get the number of partitions processed