Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PAUSE MIRROR support #605

Merged
merged 9 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

state := peerflow.NewCDCFlowState()
state := peerflow.NewCDCFlowWorkflowState()
_, err = h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
Expand Down Expand Up @@ -350,6 +350,37 @@ func (h *FlowRequestHandler) ShutdownFlow(
}, nil
}

func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
var err error
if req.RequestedFlowState == protos.FlowState_STATE_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.NoopSignal,
)
}
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

return &protos.FlowStateChangeResponse{
Ok: true,
}, nil
}

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 3 * time.Second
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package connpostgres

import (
"context"
"database/sql"
"fmt"
"regexp"
"time"
Expand Down Expand Up @@ -839,7 +838,7 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error {
}
defer func() {
deferErr := syncFlowCleanupTx.Rollback(c.ctx)
if deferErr != sql.ErrTxDone && deferErr != nil {
if deferErr != pgx.ErrTxClosed && deferErr != nil {
log.WithFields(log.Fields{
"flowName": jobName,
}).Errorf("unexpected error while rolling back transaction for flow cleanup: %v", deferErr)
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment,
connectionGen.FlowJobName,
)
if err == nil {
var state peerflow.CDCFlowState
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err != nil {
log.Errorln(err)
Expand Down Expand Up @@ -95,7 +95,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment,
connectionGen.FlowJobName,
)
if err == nil {
var state peerflow.CDCFlowState
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err != nil {
log.Errorln(err)
Expand Down
2 changes: 1 addition & 1 deletion flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

450 changes: 337 additions & 113 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions flow/generated/protos/route_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type ContextKey string
const (
NoopSignal CDCFlowSignal = iota
ShutdownSignal
PauseSignal

EnableMetricsKey ContextKey = "enableMetrics"
CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor"
)
Expand Down
27 changes: 27 additions & 0 deletions flow/utils/signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package util

import (
"github.com/PeerDB-io/peer-flow/shared"
"go.temporal.io/sdk/log"
)

func FlowSignalHandler(activeSignal shared.CDCFlowSignal,
v shared.CDCFlowSignal, logger log.Logger) shared.CDCFlowSignal {
if v == shared.ShutdownSignal {
logger.Info("received shutdown signal")
return v
} else if v == shared.PauseSignal {
logger.Info("received pause signal")
if activeSignal == shared.NoopSignal {
logger.Info("workflow was running, pausing it")
return v
}
} else if v == shared.NoopSignal {
logger.Info("received resume signal")
if activeSignal == shared.PauseSignal {
logger.Info("workflow was paused, resuming it")
return v
}
}
return activeSignal
}
Loading
Loading