Skip to content

Commit

Permalink
Support DROP MIRROR for Query Replication
Browse files Browse the repository at this point in the history
fixes: #476
  • Loading branch information
iskakaushik committed Oct 4, 2023
1 parent b181cff commit 674a416
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
26 changes: 17 additions & 9 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context, req *protos.ShutdownRequest) (*protos.ShutdownResponse, error) {
ctx context.Context,
req *protos.ShutdownRequest,
) (*protos.ShutdownResponse, error) {
err := h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
Expand Down Expand Up @@ -153,9 +155,9 @@ func (h *FlowRequestHandler) ShutdownFlow(

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 5 * time.Second
expBackoff.MaxInterval = 30 * time.Second
expBackoff.MaxElapsedTime = 5 * time.Minute
expBackoff.InitialInterval = 3 * time.Second
expBackoff.MaxInterval = 10 * time.Second
expBackoff.MaxElapsedTime = 1 * time.Minute

// empty will terminate the latest run
runID := ""
Expand All @@ -176,14 +178,20 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI

err := backoff.Retry(operation, expBackoff)
if err != nil {
// terminate workflow if it is still running
reason := "PeerFlow workflow did not close in time"
err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason)
if err != nil {
return h.handleWorkflowNotClosed(ctx, workflowID, runID)
}

return nil
}

func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workflowID, runID string) error {
if err := h.temporalClient.CancelWorkflow(ctx, workflowID, runID); err != nil {
log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error())
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}

return nil
}

Expand Down
14 changes: 9 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -231,14 +232,17 @@ func QRepFlowWorkflow(

// register a signal handler to terminate the workflow
terminateWorkflow := false
signalChan := workflow.GetSignalChannel(ctx, "terminate")
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) {
var signal string
c.Receive(ctx, &signal)
logger.Info("Received signal to terminate workflow", "Signal", signal)
terminateWorkflow = true
var signalVal shared.CDCFlowSignal
c.Receive(ctx, &signalVal)
logger.Info("received signal", "signal", signalVal)
if signalVal == shared.ShutdownSignal {
logger.Info("received shutdown signal")
terminateWorkflow = true
}
})

// register a query to get the number of partitions processed
Expand Down

0 comments on commit 674a416

Please sign in to comment.