Skip to content

Commit

Permalink
PAUSE MIRROR support (#605)
Browse files Browse the repository at this point in the history
`PAUSE MIRROR` supports both CDC and QRep mirrors. This is implemented
by signalling the Temporal workflow. A PauseSignal delivered to the
workflow should pause it, while a NoopSignal delivered to the workflow
should resume it. Signals that don't make sense for the current workflow
state are ignored. Currently supported only via the query layer, not
from the UI. The following caveats exist for now:

1) QRep mirrors cannot be paused during partition reading or
replicating, only after a run completes successfully and the signals are
read. This is usually not an issue but could be problematic for QRep
runs with a lot of partitions are running.
2) CDC mirrors similarly will only process signals and pause after a
pull + sync + normalize cycle is run. Due to 1), a CDC mirror cannot
pause during the initial snapshot phase, which is implemented as a QRep
mirror internally. This will be marked as unpausable in the UI.
  • Loading branch information
heavycrystal authored Nov 8, 2023
1 parent 8f8fdbd commit 319cde6
Show file tree
Hide file tree
Showing 22 changed files with 1,438 additions and 196 deletions.
33 changes: 32 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

state := peerflow.NewCDCFlowState()
state := peerflow.NewCDCFlowWorkflowState()
_, err = h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
Expand Down Expand Up @@ -350,6 +350,37 @@ func (h *FlowRequestHandler) ShutdownFlow(
}, nil
}

func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
var err error
if req.RequestedFlowState == protos.FlowState_STATE_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.NoopSignal,
)
}
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

return &protos.FlowStateChangeResponse{
Ok: true,
}, nil
}

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 3 * time.Second
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package connpostgres

import (
"context"
"database/sql"
"fmt"
"regexp"
"time"
Expand Down Expand Up @@ -839,7 +838,7 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error {
}
defer func() {
deferErr := syncFlowCleanupTx.Rollback(c.ctx)
if deferErr != sql.ErrTxDone && deferErr != nil {
if deferErr != pgx.ErrTxClosed && deferErr != nil {
log.WithFields(log.Fields{
"flowName": jobName,
}).Errorf("unexpected error while rolling back transaction for flow cleanup: %v", deferErr)
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment,
connectionGen.FlowJobName,
)
if err == nil {
var state peerflow.CDCFlowState
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err != nil {
log.Errorln(err)
Expand Down Expand Up @@ -95,7 +95,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment,
connectionGen.FlowJobName,
)
if err == nil {
var state peerflow.CDCFlowState
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err != nil {
log.Errorln(err)
Expand Down
2 changes: 1 addition & 1 deletion flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

450 changes: 337 additions & 113 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions flow/generated/protos/route_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type ContextKey string
const (
NoopSignal CDCFlowSignal = iota
ShutdownSignal
PauseSignal

EnableMetricsKey ContextKey = "enableMetrics"
CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor"
)
Expand Down
27 changes: 27 additions & 0 deletions flow/utils/signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package util

import (
"github.com/PeerDB-io/peer-flow/shared"
"go.temporal.io/sdk/log"
)

func FlowSignalHandler(activeSignal shared.CDCFlowSignal,
v shared.CDCFlowSignal, logger log.Logger) shared.CDCFlowSignal {
if v == shared.ShutdownSignal {
logger.Info("received shutdown signal")
return v
} else if v == shared.PauseSignal {
logger.Info("received pause signal")
if activeSignal == shared.NoopSignal {
logger.Info("workflow was running, pausing it")
return v
}
} else if v == shared.NoopSignal {
logger.Info("received resume signal")
if activeSignal == shared.PauseSignal {
logger.Info("workflow was paused, resuming it")
return v
}
}
return activeSignal
}
Loading

0 comments on commit 319cde6

Please sign in to comment.